From 51891289f25dda9e3fb443fdb75a76b38aa3e6b3 Mon Sep 17 00:00:00 2001 From: Fullstop000 Date: Sun, 19 Jan 2020 16:03:07 +0800 Subject: [PATCH] fix unwrap panic Signed-off-by: Fullstop000 --- .../test_raft_follower_replication.rs | 29 ++++++++++++++++ src/group/mod.rs | 34 +++++++++++++------ src/raft.rs | 3 ++ 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/harness/tests/integration_cases/test_raft_follower_replication.rs b/harness/tests/integration_cases/test_raft_follower_replication.rs index 329067df6..db5eb8c25 100644 --- a/harness/tests/integration_cases/test_raft_follower_replication.rs +++ b/harness/tests/integration_cases/test_raft_follower_replication.rs @@ -613,3 +613,32 @@ fn test_delegate_must_be_able_to_send_logs_to_targets() { sandbox.propose(false); sandbox.assert_final_state(); } + +#[test] +fn test_follower_replication_with_inconsistent_progress_set() { + let l = default_logger(); + let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4])]; + let followers = vec![ + (2, FollowerScenario::UpToDate), + (3, FollowerScenario::UpToDate), + (4, FollowerScenario::NeedEntries(10)), + ]; + let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 20); + // Remove peer 2 from ProgressSet + let leader = sandbox.leader_mut(); + let mut prs = leader.take_prs(); + prs.remove(2).unwrap(); + leader.set_prs(prs); + sandbox.propose(true); + let mut msgs = sandbox.leader_mut().read_messages(); + assert_eq!(1, msgs.len()); + let m = msgs.pop().unwrap(); + // only bcast to 4 even if the peer2 is in group system + assert_eq!(m.bcast_targets, vec![4]); + + let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 20); + // Remove peer 2 from group system + sandbox.leader_mut().groups.remove_node(2); + sandbox.propose(false); + sandbox.assert_final_state(); +} diff --git a/src/group/mod.rs b/src/group/mod.rs index fb1852edb..05c9e8f89 100644 --- a/src/group/mod.rs +++ b/src/group/mod.rs @@ -95,15 +95,16 @@ impl Groups { let (mut chosen, mut matched, mut bcast_targets) = (INVALID_ID, 0, vec![]); for id in self.candidate_delegates(group_id) { - let pr = prs.get(id).unwrap(); - if matched < pr.matched { - if chosen != INVALID_ID { - bcast_targets.push(chosen); + if let Some(pr) = prs.get(id) { + if matched < pr.matched { + if chosen != INVALID_ID { + bcast_targets.push(chosen); + } + chosen = id; + matched = pr.matched; + } else { + bcast_targets.push(id); } - chosen = id; - matched = pr.matched; - } else { - bcast_targets.push(id); } } @@ -131,7 +132,7 @@ impl Groups { /// Unset the delegate by delegate id. If the peer is not delegate, do nothing. pub(crate) fn remove_delegate(&mut self, delegate: u64) { if self.bcast_targets.remove(&delegate).is_some() { - // Remove the delegate from the group system since it's temorary unreachable. + // Remove the delegate from the group system since it's temporarily unreachable. // And the peer will be re-added after the leader receives a message from it. self.indexes.remove(&delegate); for (peer, (_, d)) in self.indexes.iter_mut() { @@ -162,9 +163,11 @@ impl Groups { if *gid == group_id { return false; } + // The peer changes its group, clean the stale delegate self.unmark_peer(peer); self.mark_peer(peer, group_id); } else { + // New peer added self.mark_peer(peer, group_id); } !self.unresolved.is_empty() @@ -176,6 +179,7 @@ impl Groups { self.remove_delegate(del); return; } + // Remove the peer from the old delegate `bcast_targets` let mut targets = self.bcast_targets.remove(&del).unwrap(); let pos = targets.iter().position(|id| *id == peer).unwrap(); targets.swap_remove(pos); @@ -192,8 +196,7 @@ impl Groups { .find(|(_, (gid, _))| *gid == group_id) .map_or((false, INVALID_ID), |(_, (_, d))| (true, *d)); - let _x = self.indexes.insert(peer, (group_id, delegate)); - debug_assert!(_x.is_none(), "peer can't exist before mark"); + let _ = self.indexes.insert(peer, (group_id, delegate)); if delegate != INVALID_ID { self.bcast_targets.get_mut(&delegate).unwrap().push(peer); @@ -204,6 +207,15 @@ impl Groups { } } + /// Removes the node in group system by given `id` + pub fn remove_node(&mut self, id: u64) { + self.indexes.remove(&id); + self.unmark_peer(id); + if let Some(i) = self.unresolved.iter().position(|x| *x == id) { + let _ = self.unresolved.remove(i); + } + } + // Pick delegates for all peers if need. // TODO: change to `pub(crate)` after we find a simple way to test. pub fn resolve_delegates(&mut self, prs: &ProgressSet) { diff --git a/src/raft.rs b/src/raft.rs index cf8b95a73..c2bd9e431 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -2200,6 +2200,9 @@ impl Raft { self.abort_leader_transfer(); } + // Removes the group info + self.groups.remove_node(id); + Ok(()) }