Skip to content

Commit

Permalink
fix unwrap panic
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <[email protected]>
  • Loading branch information
Fullstop000 committed Jan 19, 2020
1 parent 3e7cea5 commit 5189128
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
29 changes: 29 additions & 0 deletions harness/tests/integration_cases/test_raft_follower_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,32 @@ fn test_delegate_must_be_able_to_send_logs_to_targets() {
sandbox.propose(false);
sandbox.assert_final_state();
}

#[test]
fn test_follower_replication_with_inconsistent_progress_set() {
let l = default_logger();
let group_config = vec![(1, vec![1]), (2, vec![2, 3, 4])];
let followers = vec![
(2, FollowerScenario::UpToDate),
(3, FollowerScenario::UpToDate),
(4, FollowerScenario::NeedEntries(10)),
];
let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 20);
// Remove peer 2 from ProgressSet
let leader = sandbox.leader_mut();
let mut prs = leader.take_prs();
prs.remove(2).unwrap();
leader.set_prs(prs);
sandbox.propose(true);
let mut msgs = sandbox.leader_mut().read_messages();
assert_eq!(1, msgs.len());
let m = msgs.pop().unwrap();
// only bcast to 4 even if the peer2 is in group system
assert_eq!(m.bcast_targets, vec![4]);

let mut sandbox = Sandbox::new(&l, 1, followers.clone(), group_config.clone(), 5, 20);
// Remove peer 2 from group system
sandbox.leader_mut().groups.remove_node(2);
sandbox.propose(false);
sandbox.assert_final_state();
}
34 changes: 23 additions & 11 deletions src/group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ impl Groups {

let (mut chosen, mut matched, mut bcast_targets) = (INVALID_ID, 0, vec![]);
for id in self.candidate_delegates(group_id) {
let pr = prs.get(id).unwrap();
if matched < pr.matched {
if chosen != INVALID_ID {
bcast_targets.push(chosen);
if let Some(pr) = prs.get(id) {
if matched < pr.matched {
if chosen != INVALID_ID {
bcast_targets.push(chosen);
}
chosen = id;
matched = pr.matched;
} else {
bcast_targets.push(id);
}
chosen = id;
matched = pr.matched;
} else {
bcast_targets.push(id);
}
}

Expand Down Expand Up @@ -131,7 +132,7 @@ impl Groups {
/// Unset the delegate by delegate id. If the peer is not delegate, do nothing.
pub(crate) fn remove_delegate(&mut self, delegate: u64) {
if self.bcast_targets.remove(&delegate).is_some() {
// Remove the delegate from the group system since it's temorary unreachable.
// Remove the delegate from the group system since it's temporarily unreachable.
// And the peer will be re-added after the leader receives a message from it.
self.indexes.remove(&delegate);
for (peer, (_, d)) in self.indexes.iter_mut() {
Expand Down Expand Up @@ -162,9 +163,11 @@ impl Groups {
if *gid == group_id {
return false;
}
// The peer changes its group, clean the stale delegate
self.unmark_peer(peer);
self.mark_peer(peer, group_id);
} else {
// New peer added
self.mark_peer(peer, group_id);
}
!self.unresolved.is_empty()
Expand All @@ -176,6 +179,7 @@ impl Groups {
self.remove_delegate(del);
return;
}
// Remove the peer from the old delegate `bcast_targets`
let mut targets = self.bcast_targets.remove(&del).unwrap();
let pos = targets.iter().position(|id| *id == peer).unwrap();
targets.swap_remove(pos);
Expand All @@ -192,8 +196,7 @@ impl Groups {
.find(|(_, (gid, _))| *gid == group_id)
.map_or((false, INVALID_ID), |(_, (_, d))| (true, *d));

let _x = self.indexes.insert(peer, (group_id, delegate));
debug_assert!(_x.is_none(), "peer can't exist before mark");
let _ = self.indexes.insert(peer, (group_id, delegate));

if delegate != INVALID_ID {
self.bcast_targets.get_mut(&delegate).unwrap().push(peer);
Expand All @@ -204,6 +207,15 @@ impl Groups {
}
}

/// Removes the node in group system by given `id`
pub fn remove_node(&mut self, id: u64) {
self.indexes.remove(&id);
self.unmark_peer(id);
if let Some(i) = self.unresolved.iter().position(|x| *x == id) {
let _ = self.unresolved.remove(i);
}
}

// Pick delegates for all peers if need.
// TODO: change to `pub(crate)` after we find a simple way to test.
pub fn resolve_delegates(&mut self, prs: &ProgressSet) {
Expand Down
3 changes: 3 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,9 @@ impl<T: Storage> Raft<T> {
self.abort_leader_transfer();
}

// Removes the group info
self.groups.remove_node(id);

Ok(())
}

Expand Down

0 comments on commit 5189128

Please sign in to comment.