Skip to content

Commit

Permalink
Add new message type MsgGroupBroadcast and corresponding handler
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <[email protected]>
  • Loading branch information
LintianShi committed Aug 29, 2022
1 parent a9d37b7 commit e8c7473
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 7 deletions.
13 changes: 13 additions & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ message Snapshot {
SnapshotMetadata metadata = 2;
}

// Forward is a type that tells the agent how to forward the MsgGroupBroadcast from the leader.
//
// Field to is the destination of forwarding.
// Field log_term and index is the previous entry of log entries that should be forwarded.
// Entries to be forwarded is the range (index, last_index].
message Forward {
uint64 to = 1;
uint64 log_term = 2;
uint64 index = 3;
}

enum MessageType {
MsgHup = 0;
MsgBeat = 1;
Expand All @@ -66,6 +77,7 @@ enum MessageType {
MsgReadIndexResp = 16;
MsgRequestPreVote = 17;
MsgRequestPreVoteResponse = 18;
MsgGroupBroadcast = 19;
}

message Message {
Expand All @@ -89,6 +101,7 @@ message Message {
uint64 reject_hint = 11;
bytes context = 12;
uint64 priority = 14;
repeated Forward forwards = 16;
}

message HardState {
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub struct Config {
/// rejoins the cluster.
pub pre_vote: bool,

/// Enables follower replication.
/// This reduces the across-AZ traffic of cloud deployment.
pub follower_repl: bool,

/// The range of election timeout. In some cases, we hope some nodes has less possibility
/// to become leader. This configuration ensures that the randomized election_timeout
/// will always be suit in [min_election_tick, max_election_tick).
Expand Down Expand Up @@ -112,6 +116,7 @@ impl Default for Config {
max_inflight_msgs: 256,
check_quorum: false,
pre_vote: false,
follower_repl: false,
min_election_tick: 0,
max_election_tick: 0,
read_only_option: ReadOnlyOption::Safe,
Expand Down
92 changes: 85 additions & 7 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ pub struct RaftCore<T: Storage> {
/// Enable this if greater cluster stability is preferred over faster elections.
pub pre_vote: bool,

/// Enable follower replication.
///
/// This enables data replication from a follower to other servers in the same available zone.
///
/// Enable this for reducing across-AZ traffic of cloud deployment.
pub follower_repl: bool,

skip_bcast_commit: bool,
batch_append: bool,

Expand Down Expand Up @@ -337,6 +344,7 @@ impl<T: Storage> Raft<T> {
promotable: false,
check_quorum: c.check_quorum,
pre_vote: c.pre_vote,
follower_repl: c.follower_repl,
read_only: ReadOnly::new(c.read_only_option),
heartbeat_timeout: c.heartbeat_tick,
election_timeout: c.election_tick,
Expand Down Expand Up @@ -1372,6 +1380,7 @@ impl<T: Storage> Raft<T> {
if m.get_msg_type() == MessageType::MsgAppend
|| m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgSnapshot
|| m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl
{
self.become_follower(m.term, m.from);
} else {
Expand All @@ -1381,7 +1390,8 @@ impl<T: Storage> Raft<T> {
} else if m.term < self.term {
if (self.check_quorum || self.pre_vote)
&& (m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgAppend)
|| m.get_msg_type() == MessageType::MsgAppend
|| m.get_msg_type() == MessageType::MsgGroupBroadcast && self.follower_repl)
{
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
Expand Down Expand Up @@ -2314,6 +2324,11 @@ impl<T: Storage> Raft<T> {
self.leader_id = m.from;
self.handle_append_entries(&m);
}
MessageType::MsgGroupBroadcast => {
self.election_elapsed = 0;
self.leader_id = m.from;
self.handle_group_broadcast(&m);
}
MessageType::MsgHeartbeat => {
self.election_elapsed = 0;
self.leader_id = m.from;
Expand Down Expand Up @@ -2425,13 +2440,14 @@ impl<T: Storage> Raft<T> {
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
/// Try to append entries, and return the append result.
/// Return true only if the entries in the message has been appended in the log successfully.
pub fn try_append_entries(&mut self, m: &Message) -> bool {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
return false;
}

if m.index < self.raft_log.committed {
debug!(
self.logger,
Expand All @@ -2443,13 +2459,14 @@ impl<T: Storage> Raft<T> {
to_send.index = self.raft_log.committed;
to_send.commit = self.raft_log.committed;
self.r.send(to_send, &mut self.msgs);
return;
return false;
}

let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);

let mut success = true;
if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
Expand All @@ -2458,7 +2475,7 @@ impl<T: Storage> Raft<T> {
} else {
debug!(
self.logger,
"rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
"Reject append [logterm: {msg_log_term}, index: {msg_index}] \
from {from}",
msg_log_term = m.log_term,
msg_index = m.index,
Expand All @@ -2483,9 +2500,70 @@ impl<T: Storage> Raft<T> {
to_send.reject = true;
to_send.reject_hint = hint_index;
to_send.log_term = hint_term.unwrap();
success = false;
}
to_send.set_commit(self.raft_log.committed);
self.r.send(to_send, &mut self.msgs);
success
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
self.try_append_entries(m);
}

/// For a broadcast, append entries to onw log and forward MsgAppend to other dest.
pub fn handle_group_broadcast(&mut self, m: &Message) {
if self.try_append_entries(m) {
// If the agent fails to append entries from the leader,
// the agent cannot forward MsgAppend.
let agent_id = m.get_to();
for forward in m.get_forwards() {
// Dest should be in the cluster.
if self.prs().get(forward.get_to()).is_some() {
// Fetch log entries from the forward.index to the last index of log.
let ents = self.raft_log.entries(
forward.get_index() + 1,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to: forward.get_to(),
term: m.term,
aggressively: false,
}),
);
if self
.raft_log
.match_term(forward.get_index(), forward.get_log_term())
{
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.set_entries(ents.unwrap().into());
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();
debug!(
self.logger,
"Peer {} forward MsgAppend from {} to {}",
agent_id,
m_append.from,
m_append.to
);
self.r.send(m_append, &mut self.msgs)
} else {
warn!(
self.logger,
"The agent's log does not match with index {} log term {} in forward message",
forward.get_index(),
forward.get_log_term()
);
}
}
}
}
}

// TODO: revoke pub when there is a better way to test.
Expand Down

0 comments on commit e8c7473

Please sign in to comment.