Skip to content

Commit

Permalink
Added extra liveness predicate (#125)
Browse files Browse the repository at this point in the history
* Added an extra liveness predicate.

This PR also introduces an important semantic change.

`fn get` now returns None if a Key-Value is marked as deleted.

* Apply suggestions from code review

Co-authored-by: Adrien "Code Monkey" Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Feb 22, 2024
1 parent 7de38f5 commit 06bb2b2
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 14 deletions.
1 change: 1 addition & 0 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async fn main() -> anyhow::Result<()> {
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: Duration::from_secs(60),
extra_liveness_predicate: None,
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
let chitchat = chitchat_handler.chitchat();
Expand Down
12 changes: 11 additions & 1 deletion chitchat/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
use std::net::SocketAddr;
use std::time::Duration;

use crate::{ChitchatId, FailureDetectorConfig};
use crate::{ChitchatId, FailureDetectorConfig, NodeState};

/// User-defined predicate liveness predication applied on top of the output of the failure
/// detector.
pub type ExtraLivenessPredicate = Box<dyn Fn(&NodeState) -> bool + Send>;

/// A struct for configuring a Chitchat instance.
pub struct ChitchatConfig {
Expand All @@ -23,6 +27,10 @@ pub struct ChitchatConfig {
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: Duration,
// Extra lifeness predicate that can be used to define what a node being "live" means.
// It can be used for instance, to only surface the nodes that are both alive according
// to the failure detector, but also have a given set of required keys.
pub extra_liveness_predicate: Option<ExtraLivenessPredicate>,
}

impl ChitchatConfig {
Expand All @@ -38,6 +46,7 @@ impl ChitchatConfig {
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(10_000),
extra_liveness_predicate: None,
}
}
}
Expand All @@ -55,6 +64,7 @@ impl Default for ChitchatConfig {
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h
extra_liveness_predicate: None,
}
}
}
159 changes: 149 additions & 10 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ impl Chitchat {
/// Reports heartbeats to the failure detector for nodes in the delta for which we received an
/// update.
fn report_heartbeat(&mut self, chitchat_id: &ChitchatId, heartbeat: Heartbeat) {
if chitchat_id == self.self_chitchat_id() {
return;
}
let node_state = self.cluster_state.node_state_mut(chitchat_id);
if node_state.try_set_heartbeat(heartbeat) {
self.failure_detector.report_heartbeat(chitchat_id);
Expand Down Expand Up @@ -192,12 +195,14 @@ impl Chitchat {
let live_nodes = current_live_nodes
.keys()
.cloned()
.map(|chitchat_id| {
let node_state = self
.node_state(&chitchat_id)
.expect("Node state should exist.")
.clone();
(chitchat_id, node_state)
.flat_map(|chitchat_id| {
let node_state = self.node_state(&chitchat_id)?;
if let Some(liveness_extra_predicate) = &self.config.extra_liveness_predicate {
if !liveness_extra_predicate(node_state) {
return None;
}
}
Some((chitchat_id, node_state.clone()))
})
.collect::<BTreeMap<_, _>>();
self.previous_live_nodes = current_live_nodes;
Expand Down Expand Up @@ -375,6 +380,16 @@ mod tests {
}
}

async fn start_node_with_config(
transport: &dyn Transport,
config: ChitchatConfig,
) -> ChitchatHandle {
let initial_kvs: Vec<(String, String)> = Vec::new();
spawn_chitchat(config, initial_kvs, transport)
.await
.unwrap()
}

async fn start_node(
chitchat_id: ChitchatId,
seeds: &[String],
Expand All @@ -393,11 +408,9 @@ mod tests {
..Default::default()
},
marked_for_deletion_grace_period: Duration::from_secs(3_600),
extra_liveness_predicate: None,
};
let initial_kvs: Vec<(String, String)> = Vec::new();
spawn_chitchat(config, initial_kvs, transport)
.await
.unwrap()
start_node_with_config(transport, config).await
}

async fn setup_nodes(
Expand Down Expand Up @@ -484,6 +497,132 @@ mod tests {
assert_nodes_sync(&[&node1, &node2]);
}

#[tokio::test]
async fn test_live_node_channel() {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let nodes = setup_nodes(20001..=20005, &transport).await;
let node2 = nodes.get(1).unwrap();
let mut live_rx = node2.chitchat().lock().await.live_nodes_watcher();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 5 {
break live_members;
}
};
for node in &nodes {
assert!(live_members.contains_key(node.chitchat_id()));
}
shutdown_nodes(nodes).await.unwrap();
}

#[tokio::test]
async fn test_live_node_channel_with_extra_predicate() {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let chitchat_ids: Vec<ChitchatId> = (1..=3).map(ChitchatId::for_local_test).collect();
let make_config = |chitchat_id: &ChitchatId| ChitchatConfig {
chitchat_id: chitchat_id.clone(),
cluster_id: "default-cluster".to_string(),
gossip_interval: Duration::from_millis(100),
listen_addr: chitchat_id.gossip_advertise_addr,
seed_nodes: vec![chitchat_ids[0].gossip_advertise_addr.to_string()],
failure_detector_config: FailureDetectorConfig {
dead_node_grace_period: DEAD_NODE_GRACE_PERIOD,
phi_threshold: 5.0,
initial_interval: Duration::from_millis(100),
..Default::default()
},
marked_for_deletion_grace_period: Duration::from_secs(3_600),
extra_liveness_predicate: Some(Box::new(|node_state| {
node_state.get("READY") == Some("true")
})),
};
let mut nodes = Vec::new();
for chitchat_id in &chitchat_ids {
let config = make_config(chitchat_id);
let chitchat_handle = start_node_with_config(&transport, config).await;
nodes.push(chitchat_handle);
}

let mut num_nodes = 0;
assert!(tokio::time::timeout(Duration::from_secs(1), async {
let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
loop {
let live_members = live_rx.next().await.unwrap();
num_nodes = live_members.len();
if live_members.len() == 3 {
break live_members;
}
}
})
.await
.is_err());
assert_eq!(num_nodes, 0);

nodes[0]
.chitchat()
.lock()
.await
.self_node_state()
.set("READY", "true");
nodes[1]
.chitchat()
.lock()
.await
.self_node_state()
.set("READY", "true");
nodes[2]
.chitchat()
.lock()
.await
.self_node_state()
.set("READY", "true");

let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 3 {
break live_members;
}
};
for node in &nodes {
assert!(live_members.contains_key(node.chitchat_id()));
}

nodes[0]
.chitchat()
.lock()
.await
.self_node_state()
.mark_for_deletion("READY");

let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 2 {
break live_members;
}
};
assert!(live_members.contains_key(&chitchat_ids[1]));
assert!(live_members.contains_key(&chitchat_ids[2]));

nodes[1]
.chitchat()
.lock()
.await
.self_node_state()
.set("READY", "false");

let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 1 {
break live_members;
}
};

assert!(live_members.contains_key(&chitchat_ids[2]));

shutdown_nodes(nodes).await.unwrap();
}

#[tokio::test]
async fn test_multiple_nodes() -> anyhow::Result<()> {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
Expand Down
1 change: 0 additions & 1 deletion chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ pub async fn spawn_chitchat(
spawn_dns_refresh_loop(&config.seed_nodes).await;

let socket = transport.open(config.listen_addr).await?;

let chitchat_id = config.chitchat_id.clone();

let chitchat = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs, initial_key_values);
Expand Down
13 changes: 11 additions & 2 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,20 @@ impl NodeState {
.count()
}

/// Returns false if the key is inexistant or marked for deletion.
pub fn contains_key(&self, key: &str) -> bool {
self.get(key).is_some()
}

pub fn get(&self, key: &str) -> Option<&str> {
self.get_versioned(key)
.map(|versioned_value| versioned_value.value.as_str())
let versioned_value = self.get_versioned(key)?;
if versioned_value.tombstone.is_some() {
return None;
}
Some(versioned_value.value.as_str())
}

/// If the key is tombstoned, this method will still return the versioned value.
pub fn get_versioned(&self, key: &str) -> Option<&VersionedValue> {
self.key_values.get(key)
}
Expand Down
1 change: 1 addition & 0 deletions chitchat/tests/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl Simulator {
..Default::default()
},
marked_for_deletion_grace_period: self.marked_for_deletion_key_grace_period,
extra_liveness_predicate: None,
};
let handle = spawn_chitchat(config, Vec::new(), &self.transport)
.await
Expand Down
1 change: 1 addition & 0 deletions chitchat/tests/perf_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async fn spawn_one(chitchat_id: u16, transport: &dyn Transport) -> ChitchatHandl
..Default::default()
},
marked_for_deletion_grace_period: Duration::from_secs(10_000),
extra_liveness_predicate: None,
};
spawn_chitchat(config, Vec::new(), transport).await.unwrap()
}
Expand Down

0 comments on commit 06bb2b2

Please sign in to comment.