Skip to content

Commit

Permalink
feat: add disable_proposal_forwarding config params (#552)
Browse files Browse the repository at this point in the history
Signed-off-by: Dat Tien Nguyen <[email protected]>
  • Loading branch information
datbeohbbh authored Oct 21, 2024
1 parent dfe2239 commit 63aec46
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
71 changes: 71 additions & 0 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,77 @@ fn test_committed_entries_pagination_after_restart() {
}
}

#[test]
fn test_disable_proposal_forwarding() {
let l = default_logger();

let n1 = new_test_raft_with_config(
&Config {
id: 1,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: false,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let n2 = new_test_raft_with_config(
&Config {
id: 2,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: false,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let n3 = new_test_raft_with_config(
&Config {
id: 3,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: true,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);

// node 1 starts campaign to become leader.
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// send proposal to n2(follower) where DisableProposalForwarding is false
assert_eq!(
network
.peers
.get_mut(&2)
.unwrap()
.step(new_message(2, 2, MessageType::MsgPropose, 1)),
Ok(())
);

// verify n2(follower) does forward the proposal when DisableProposalForwarding is false
assert_eq!(network.peers.get(&2).unwrap().msgs.len(), 1);

// send proposal to n3(follower) where DisableProposalForwarding is true
assert_eq!(
network
.peers
.get_mut(&3)
.unwrap()
.step(new_message(3, 3, MessageType::MsgPropose, 1)),
Err(Error::ProposalDropped)
);

assert!(network.peers.get(&3).unwrap().msgs.is_empty());
}

#[derive(Default)]
struct IgnoreSizeHintMemStorage {
inner: MemStorage,
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ pub struct Config {
/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub max_apply_unpersisted_log_limit: u64,

/// If enable, followers will not forward proposal to leader.
pub disable_proposal_forwarding: bool,
}

impl Default for Config {
Expand All @@ -125,6 +128,7 @@ impl Default for Config {
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
max_apply_unpersisted_log_limit: 0,
disable_proposal_forwarding: false,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ pub struct RaftCore<T: Storage> {

skip_bcast_commit: bool,
batch_append: bool,
disable_proposal_forwarding: bool,

heartbeat_timeout: usize,
election_timeout: usize,
Expand Down Expand Up @@ -363,6 +364,7 @@ impl<T: Storage> Raft<T> {
last_log_tail_index: 0,
},
max_committed_size_per_ready: c.max_committed_size_per_ready,
disable_proposal_forwarding: c.disable_proposal_forwarding,
},
};
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
Expand Down Expand Up @@ -2337,6 +2339,15 @@ impl<T: Storage> Raft<T> {
term = self.term;
);
return Err(Error::ProposalDropped);
} else if self.disable_proposal_forwarding {
info!(
self.logger,
"{from} not forwarding to leader {to} at term {term}; dropping proposal",
from = self.id,
to = self.leader_id,
term = self.term;
);
return Err(Error::ProposalDropped);
}
m.to = self.leader_id;
self.r.send(m, &mut self.msgs);
Expand Down

0 comments on commit 63aec46

Please sign in to comment.