Skip to content

Commit

Permalink
Use the heartbeat in digest for cluster membership. (#123)
Browse files Browse the repository at this point in the history
Right now we send the heartbeat of all of the known nodes in the cluster
during digests.

We can use that information for cluster membership.
This will allow nodes to detect new member, join a cluster, and detect
failure considerably faster.

This PR also removes heartbeat from delta message.
Instead it joins the last_gc_version, to fix a bug introduced in #122.

Bugfix:
Avoid updating the time of death of a node when it is redetected as
faulty.

We remove outselves from the list of seeds. Before nodes were regularly gossipping with themselves.

Code improvement:
In the failure detector when we have few samples. We now use additive
smoothing.
  • Loading branch information
fulmicoton authored Feb 19, 2024
1 parent ec1dbbe commit 7de38f5
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 236 deletions.
81 changes: 35 additions & 46 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use tokio::time::Instant;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, VersionedValue};
use crate::{ChitchatId, Version, VersionedValue};

/// A delta is the message we send to another node to update it.
///
Expand Down Expand Up @@ -32,7 +32,7 @@ impl Delta {
let node_deltas = self.node_deltas.iter().flat_map(|node_delta| {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
heartbeat: node_delta.heartbeat,
last_gc_version: node_delta.last_gc_version,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
Expand All @@ -49,7 +49,7 @@ enum DeltaOp {
NodeToReset(ChitchatId),
Node {
chitchat_id: ChitchatId,
heartbeat: Heartbeat,
last_gc_version: Version,
},
KeyValue {
key: String,
Expand All @@ -61,7 +61,7 @@ enum DeltaOpRef<'a> {
NodeToReset(&'a ChitchatId),
Node {
chitchat_id: &'a ChitchatId,
heartbeat: Heartbeat,
last_gc_version: Version,
},
KeyValue {
key: &'a str,
Expand Down Expand Up @@ -108,10 +108,10 @@ impl Deserializable for DeltaOp {
}
DeltaOpTag::Node => {
let chitchat_id = ChitchatId::deserialize(buf)?;
let heartbeat = Heartbeat::deserialize(buf)?;
let last_gc_version = Version::deserialize(buf)?;
Ok(DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
})
}
DeltaOpTag::KeyValue => {
Expand Down Expand Up @@ -139,10 +139,10 @@ impl DeltaOp {
match self {
DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
} => DeltaOpRef::Node {
chitchat_id,
heartbeat: *heartbeat,
last_gc_version: *last_gc_version,
},
DeltaOp::KeyValue {
key,
Expand Down Expand Up @@ -171,11 +171,11 @@ impl<'a> Serializable for DeltaOpRef<'a> {
match self {
Self::Node {
chitchat_id,
heartbeat,
last_gc_version,
} => {
buf.push(DeltaOpTag::Node.into());
chitchat_id.serialize(buf);
heartbeat.serialize(buf);
last_gc_version.serialize(buf);
}
Self::KeyValue {
key,
Expand All @@ -198,8 +198,8 @@ impl<'a> Serializable for DeltaOpRef<'a> {
1 + match self {
Self::Node {
chitchat_id,
heartbeat,
} => chitchat_id.serialized_len() + heartbeat.serialized_len(),
last_gc_version,
} => chitchat_id.serialized_len() + last_gc_version.serialized_len(),
Self::KeyValue {
key,
versioned_value,
Expand Down Expand Up @@ -252,14 +252,14 @@ impl Delta {
.sum()
}

pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) {
assert!(!self
.node_deltas
.iter()
.any(|node_delta| { node_delta.chitchat_id == chitchat_id }));
self.node_deltas.push(NodeDelta {
chitchat_id,
heartbeat,
last_gc_version,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -306,7 +306,7 @@ impl Delta {
#[derive(Debug, Eq, PartialEq, serde::Serialize)]
pub(crate) struct NodeDelta {
pub chitchat_id: ChitchatId,
pub heartbeat: Heartbeat,
pub last_gc_version: Version,
pub key_values: Vec<(String, VersionedValue)>,
}

Expand Down Expand Up @@ -335,14 +335,14 @@ impl DeltaBuilder {
match op {
DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
} => {
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.current_node_delta = Some(NodeDelta {
chitchat_id,
heartbeat,
last_gc_version,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -441,10 +441,10 @@ impl DeltaSerializer {
}

/// Returns false if the node could not be added because the payload would exceed the mtu.
pub fn try_add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool {
pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool {
let new_node_op = DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
};
self.try_add_op(new_node_op)
}
Expand All @@ -471,9 +471,8 @@ mod tests {

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) +
// 1 bytes (empty tombstone).
Expand All @@ -497,9 +496,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes
assert!(delta_writer.try_add_node(node2, heartbeat));
assert!(delta_writer.try_add_node(node2, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand Down Expand Up @@ -529,9 +527,8 @@ mod tests {

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 27 bytes (node) + 2bytes (block length)
assert!(delta_writer.try_add_node(node1, heartbeat));
// +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length)
assert!(delta_writer.try_add_node(node1, 0));

// +24 bytes (kv + op tag)
assert!(delta_writer.try_add_kv(
Expand All @@ -554,9 +551,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node2, heartbeat));
// +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node2, 0));
test_aux_delta_writer(delta_writer, 80);
}

Expand All @@ -576,11 +572,10 @@ mod tests {
assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000)));

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);

// +8 bytes (heartbeat) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new
// block) = 71
assert!(delta_writer.try_add_node(node1, heartbeat));
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic
// new block) = 71
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes (kv) + 1 (op tag)
// = 95
Expand All @@ -604,10 +599,9 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +8 bytes (heartbeat) + 27 bytes (ChitchatId) + 1 byte (op tag)
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag)
// = 155
assert!(delta_writer.try_add_node(node2, heartbeat));
assert!(delta_writer.try_add_node(node2, 0u64));
// The block got compressed.
test_aux_delta_writer(delta_writer, 85);
}
Expand All @@ -618,9 +612,8 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand All @@ -642,9 +635,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(!delta_writer.try_add_node(node2, heartbeat));
assert!(!delta_writer.try_add_node(node2, 0u64));

// The block got compressed.
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -656,9 +648,8 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes.
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand Down Expand Up @@ -692,11 +683,10 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);

// + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag)
// = 40
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag)
// = 67
Expand Down Expand Up @@ -728,8 +718,7 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(62);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

assert!(delta_writer.try_add_kv(
"key11",
Expand Down
Loading

0 comments on commit 7de38f5

Please sign in to comment.