Cassandra Study Notes: Gossip
Cassandra is a distributed storage system that’s designed to run on top of hundreds of nodes. Cassandra is a leaderless distributed system and it is optimized for availability. In the absence of a leader, it’s up to the nodes to exchange information with each other about the state of the cluster.
Cassandra uses Gossip protocol that allows each node to keep track of state information about other nodes in the cluster.
Note: Gossip is not used for read/writes, CQL query parse/executions, compaction, hints, streaming, etc. Strictly used to exchange node health and metadata.
The Gossiper Class manages the gossip between nodes in the cluster, and the messages are exchanged by using a GossipTask that’s configured to run every second. The Gossiper class also maintains a list of nodes that are alive and dead.
Note: Gossiper doesn’t share alive/dead statuses with its peers. It just updates locally.
Gossip Round
The Gossiper thread runs every second.
Each round of Gossip contains three messages - SYN, ACK, ACK2
In a single round, the Gossiper thread:
Gossips with random live endpoint (if any)
Gossips with a random unreachable endpoint with a certain probability depending on the number of unreachable and live nodes.
If the node gossiped to at (1) was not seed, or the number of live nodes is less than a number of seeds, gossip to random seed with a certain probability depending on a number of unreachable, seed, and live nodes.
Assuming a healthy network, these rules ensure that all nodes will eventually know about all other nodes.
Gossip Data structures
HeartBeatState
Consists of generation and version number.
HeartBeatState: generation 1259909635, version 325
Generation stays the same when the node is running. Grows every time a node is restarted. It is used to identify if a node got restarted. Generation number after a restart would be higher than the previous value. HeartBeat version number is shared with Application States and guarantees ordering.
ApplicationState
Consists of state and version number and represents a state of single "component" or "element" within Cassandra. Eg.
“load information”: (5.2, 45)
“bootstraping” : (bxLpassF3XD8Kyks, 56)
EndPointState
Includes all ApplicationStates and HeartBeatState for a node. EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that the old value will not overwrite the new one.
endPointStateMap
Internal structure in Gossiper that has EndPointState for all nodes (including itself) that it has heard about.
Gossip Exchange
Imagine a Gossip round between two nodes: Node A and Node B and assume that the endPointStateMap on Node A and Node B has the following contents
// Node A (10.0.0.1)
EndPointState 10.0.0.1
HeartBeatState: generation 1259909635, version 325
ApplicationState "load-information": 5.2, generation 1259909635, version 45
ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
EndPointState 10.0.0.2
HeartBeatState: generation 1259911052, version 61
ApplicationState "load-information": 2.7, generation 1259911052, version 2
ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
EndPointState 10.0.0.3
HeartBeatState: generation 1259912238, version 5
ApplicationState "load-information": 12.0, generation 1259912238, version 3
EndPointState 10.0.0.4
HeartBeatState: generation 1259912942, version 18
ApplicationState "load-information": 6.7, generation 1259912942, version 3
ApplicationState "normal": bj05IVc0lvRXw2xH, generation 1259912942, version 7
// Node B (10.0.0.2)
EndPointState 10.0.0.1
HeartBeatState: generation 1259909635, version 324
ApplicationState "load-information": 5.2, generation 1259909635, version 45
ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
EndPointState 10.0.0.2
HeartBeatState: generation 1259911052, version 63
ApplicationState "load-information": 2.7, generation 1259911052, version 2
ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62
EndPointState 10.0.0.3
HeartBeatState: generation 1259812143, version 2142
ApplicationState "load-information": 16.0, generation 1259812143, version 1803
ApplicationState "normal": W2U1XYUC3wMppcY7, generation 1259812143, version 6
Node A sends GossipDigestSynMessage
Node B sends GossipDigestAckMessage
Node A replies GossipDigestAck2Message
Node A sends a GossipDigestSynMessage which includes a list of gossip digests. A single digest consists of [endPointAddress, generationNumber, version]
In this case, the contents of GossipDigestSynMessage would be:
"
10.0.0.1:1259909635:325
10.0.0.2:1259911052:61
10.0.0.3:1259912238:5
10.0.0.4:1259912942:18
"
Node B receives GossipDigestSynMessage and sends a GossipDigestAckMessage to Node A
GossipDigestAckMessage includes two parts: gossip digest list and endpoint state list.
From the gossip digest list in GossipDigestSynMessage Node B will know for each endpoint whether it has new or dated information than Node A. An example to illustrate this:
GossipDigestAckMessage, which includes the following information: (The phrases Line 1, Line 2 is for illustration only. It is not a part of the Ack Message)
Line 1: 10.0.0.1:1259909635:324
Line 2: 10.0.0.3:1259912238:0
Line 3: 10.0.0.4:1259912942:0
Line 4: 10.0.0.2:[ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62], [HeartBeatState, generation 1259911052, version 63]
Line 1: The generation numbers(1259909635) match. It means Node A has not been restarted since the last Gossip round. The version number (325) in the digest from Node A is higher than the version number in Node B’s endPointStateMap (324) So Node B includes an entry in the Ack Message which essentially asks for all updates for Node A (10.0.0.1) from version 324.
Line 2: Generation numbers in the endPointStateMap for Node 10.0.0.3 don’t match. Node B’s endPointStateMap has a generation value that is smaller than the value in the digest from Node A, meaning 10.0.0.3 must have rebooted. So Node B asks all data from Node A for generation 1259912238 starting from the smallest version number 0.
Line 3: Node B does not know anything about node 10.0.0.4. There’s no entry Node B’s endPointStateMap. So Node B asks Node A for all data about 10.0.0.4.
Line 4: Node B knows more than Node A about node 10.0.0.2.
Generations match, but Node B’s version (63) is bigger than Node A’s version (61)
Node A already knows until version 61. So Node B looks for states that are newer than version 61.
Node B’s endPointStateMap has two application states that have version > 61.
Application state "normal" (version 62) and HeartBeatState (version 63).
So, Node B sends these ApplicationStates to Node A. Please note that in this case, Node B doesn’t send digests, as digest only tells the maximum version number. In this case of conflicts, Nodes send full ApplicationStates.
Node A sends a GossipDigestAck2Message back to Node B
The GossipDigestAck2Message is very similar to the GossipDigestAckMessage but is performed in the opposite direction. That is, if Node B previously sent the GossiperDigestAckMessage to Node A, now Node A will send a GossipDigestAck2Message back to Node B containing any information that is requested or needs to be updated.
Based on GossipDigestAckMessage Node A will send the following information to Node B
Line 1: 10.0.0.1:[ApplicationState "load-information": 5.2, generation 1259909635, version 45], [ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56], [ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87], [HeartBeatState, generation 1259909635, version 325]
Line 2: 10.0.0.3:[ApplicationState "load-information": 12.0, generation 1259912238, version 3], [HeartBeatState, generation 1259912238, version 3]
Line 3: 10.0.0.4:[ApplicationState "load-information": 6.7, generation 1259912942, version 3], [ApplicationState "normal": bj05IVc0lvRXw2xH, generation 1259912942, version 7], [HeartBeatState: generation 1259912942, version 18]
If we recollect, Node B asked Node A for everything after version 324 for 10.0.0.1's current generation, and everything for nodes 10.0.0.3 and 10.0.0.4 at their current generations.
After Node B (10.0.0.2) applies this information and updates its endPointStateMap, Node A (10.0.0.1) and Node B (10.0.0.2) will have synced their state, and this gossip round is complete.
Lather. Rinse. Repeat.