Skip to content

Commit

Permalink
Merge pull request #56 from poanetwork/vk-term-agreement
Browse files Browse the repository at this point in the history
Added Term messages in Agreement and enabled early termination
  • Loading branch information
vkomenda authored Jun 8, 2018
2 parents 138eaaf + dc4475b commit d1362ed
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 49 deletions.
1 change: 1 addition & 0 deletions proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ message AgreementProto {
bool bval = 2;
bool aux = 3;
uint32 conf = 4;
bool term = 5;
}
}
120 changes: 74 additions & 46 deletions src/agreement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ pub enum AgreementContent {
Aux(bool),
/// `Conf` message.
Conf(BinValues),
/// `Term` message.
Term(bool),
}

impl AgreementContent {
/// Creates an message with a given epoch number.
pub fn with_epoch(self, epoch: u32) -> AgreementMessage {
AgreementMessage {
epoch,
content: self,
}
}
}

/// Messages sent during the binary Byzantine agreement stage.
Expand All @@ -42,29 +54,6 @@ pub struct AgreementMessage {
pub content: AgreementContent,
}

impl AgreementMessage {
pub fn bval(epoch: u32, b: bool) -> Self {
AgreementMessage {
epoch,
content: AgreementContent::BVal(b),
}
}

pub fn aux(epoch: u32, b: bool) -> Self {
AgreementMessage {
epoch,
content: AgreementContent::Aux(b),
}
}

pub fn conf(epoch: u32, v: BinValues) -> Self {
AgreementMessage {
epoch,
content: AgreementContent::Conf(v),
}
}
}

/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// Shared network information.
Expand All @@ -81,6 +70,8 @@ pub struct Agreement<NodeUid> {
received_aux: BTreeMap<NodeUid, bool>,
/// Received `Conf` messages. Reset on every epoch update.
received_conf: BTreeMap<NodeUid, BinValues>,
/// Received `Term` messages. Kept throughout epoch updates.
received_term: BTreeMap<NodeUid, bool>,
/// The estimate of the decision value in the current epoch.
estimated: Option<bool>,
/// The value output by the agreement instance. It is set once to `Some(b)`
Expand Down Expand Up @@ -137,6 +128,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
AgreementContent::Term(v) => self.handle_term(sender_id, v),
}
}

Expand Down Expand Up @@ -172,6 +164,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
sent_bval: BTreeSet::new(),
received_aux: BTreeMap::new(),
received_conf: BTreeMap::new(),
received_term: BTreeMap::new(),
estimated: None,
output: None,
decision: None,
Expand Down Expand Up @@ -247,7 +240,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.sent_bval.insert(b);
// Multicast `BVal`.
self.messages
.push_back(AgreementMessage::bval(self.epoch, b));
.push_back(AgreementContent::BVal(b).with_epoch(self.epoch));
// Receive the `BVal` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_bval(our_uid, b)
Expand All @@ -262,7 +255,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
let v = self.bin_values;
// Multicast `Conf`.
self.messages
.push_back(AgreementMessage::conf(self.epoch, v));
.push_back(AgreementContent::Conf(v).with_epoch(self.epoch));
// Trigger the start of the `Conf` round.
self.conf_round = true;
// Receive the `Conf` message locally.
Expand Down Expand Up @@ -297,6 +290,38 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.try_finish_conf_round()
}

/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// `num_faulty` such messages with the same value from different nodes, performs expedite
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<()> {
self.received_term.insert(sender_id.clone(), b);
// Check for the expedite termination condition.
if self.decision.is_none()
&& self.received_term.iter().filter(|(_, &c)| b == c).count()
> self.netinfo.num_faulty()
{
self.decide(b);
}
Ok(())
}

/// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) {
// Output the agreement value.
self.output = Some(b);
// Latch the decided state.
self.decision = Some(b);
self.messages
.push_back(AgreementContent::Term(b).with_epoch(self.epoch));
self.received_term.insert(self.netinfo.our_uid().clone(), b);
self.terminated = true;
debug!(
"Agreement instance {:?} decided: {}",
self.netinfo.our_uid(),
b
);
}

fn try_finish_conf_round(&mut self) -> AgreementResult<()> {
if self.conf_round {
let (count_vals, vals) = self.count_conf();
Expand All @@ -313,23 +338,35 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
fn send_aux(&mut self, b: bool) -> AgreementResult<()> {
// Multicast `Aux`.
self.messages
.push_back(AgreementMessage::aux(self.epoch, b));
.push_back(AgreementContent::Aux(b).with_epoch(self.epoch));
// Receive the `Aux` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_aux(our_uid, b)
}

/// The count of `Aux` messages such that the set of values carried by those messages is a
/// subset of bin_values_r.
/// subset of bin_values_r. The count of matching `Term` messages from terminated nodes is also
/// added to the count of `Aux` messages as witnesses of the terminated nodes' decision.
///
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for N -
/// f agreeing messages would not always terminate. We can, however, expect every good node to
/// send an `Aux` value that will eventually end up in our `bin_values`.
fn count_aux(&self) -> usize {
self.received_aux
.values()
.filter(|&&b| self.bin_values.contains(b))
.count()
let mut aux: BTreeMap<_, _> = self
.received_aux
.iter()
.filter(|(_, &b)| self.bin_values.contains(b))
.collect();

let term: BTreeMap<_, _> = self
.received_term
.iter()
.filter(|(_, &b)| self.bin_values.contains(b))
.collect();

// Ensure that nodes are not counted twice.
aux.extend(term);
aux.len()
}

/// Counts the number of received `Conf` messages.
Expand All @@ -351,6 +388,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.received_conf.clear();
self.conf_round = false;
self.epoch += 1;
debug!(
"Agreement instance {:?} started epoch {}",
self.netinfo.our_uid(),
self.epoch
);
}

/// Gets a common coin and uses it to compute the next decision estimate and outputs the
Expand All @@ -371,29 +413,15 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
// some round r' > r."
self.terminated = self.terminated || self.decision == Some(coin);
if self.terminated {
debug!("Agreement instance {:?} terminated", self.netinfo.our_uid());
return Ok(());
}

self.start_next_epoch();
debug!(
"Agreement instance {:?} started epoch {}",
self.netinfo.our_uid(),
self.epoch
);

let b = if let Some(b) = vals.definite() {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
// Output the agreement value.
self.output = Some(b);
// Latch the decided state.
self.decision = Some(b);
debug!(
"Agreement instance {:?} output: {}",
self.netinfo.our_uid(),
b
);
self.decide(b);
}
b
} else {
Expand Down
11 changes: 8 additions & 3 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ impl AgreementMessage {
};
p.set_conf(bin_values);
}
AgreementContent::Term(b) => {
p.set_term(b);
}
}
p
}
Expand All @@ -94,17 +97,19 @@ impl AgreementMessage {
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementMessage::bval(epoch, mp.get_bval()))
Some(AgreementContent::BVal(mp.get_bval()).with_epoch(epoch))
} else if mp.has_aux() {
Some(AgreementMessage::aux(epoch, mp.get_aux()))
Some(AgreementContent::Aux(mp.get_aux()).with_epoch(epoch))
} else if mp.has_conf() {
match mp.get_conf() {
0 => Some(BinValues::None),
1 => Some(BinValues::False),
2 => Some(BinValues::True),
3 => Some(BinValues::Both),
_ => None,
}.map(|bin_values| AgreementMessage::conf(epoch, bin_values))
}.map(|bin_values| AgreementContent::Conf(bin_values).with_epoch(epoch))
} else if mp.has_term() {
Some(AgreementContent::Term(mp.get_term()).with_epoch(epoch))
} else {
None
}
Expand Down

0 comments on commit d1362ed

Please sign in to comment.