Skip to content

Commit

Permalink
implemented an optimistic random common coin schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
vkomenda committed Jun 22, 2018
1 parent e0a85a5 commit 7a4fc37
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/agreement/bin_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl BinValues {
replace(self, BinValues::None);
}

fn from_bool(b: bool) -> Self {
pub fn from_bool(b: bool) -> Self {
if b {
BinValues::True
} else {
Expand Down
118 changes: 93 additions & 25 deletions src/agreement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ pub struct AgreementMessage {
pub content: AgreementContent,
}

/// Possible values of the common coin schedule defining the method to derive the common coin in a
/// given epoch: as a constant value or a distributed computation.
enum CoinSchedule {
False,
True,
Random,
}

/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// Shared network information.
Expand Down Expand Up @@ -113,6 +121,8 @@ pub struct Agreement<NodeUid> {
conf_round: bool,
/// A common coin instance. It is reset on epoch update.
common_coin: CommonCoin<NodeUid, Nonce>,
/// Common coin schedule computed at the start of each epoch.
coin_schedule: CoinSchedule,
}

impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
Expand Down Expand Up @@ -201,6 +211,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
netinfo,
Nonce::new(invocation_id.as_ref(), session_id, proposer_i, 0),
),
coin_schedule: CoinSchedule::False,
})
} else {
Err(ErrorKind::UnknownProposer.into())
Expand Down Expand Up @@ -252,9 +263,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
// Send an `Aux` message at most once per epoch.
self.send_aux(b)
} else if bin_values_changed {
// If the `Conf` round has already started, a change in `bin_values` can lead to its
// end. Try if it has indeed finished.
self.try_finish_conf_round()
self.on_bin_values_changed()
} else {
Ok(())
}
Expand All @@ -267,6 +276,34 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}

/// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update
/// the epoch.
fn on_bin_values_changed(&mut self) -> AgreementResult<()> {
match self.coin_schedule {
CoinSchedule::False => {
let (aux_count, aux_vals) = self.count_aux();
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(false, aux_vals.definite())
} else {
Ok(())
}
}
CoinSchedule::True => {
let (aux_count, aux_vals) = self.count_aux();
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(true, aux_vals.definite())
} else {
Ok(())
}
}
CoinSchedule::Random => {
// If the `Conf` round has already started, a change in `bin_values` can lead to its
// end. Try if it has indeed finished.
self.try_finish_conf_round()
}
}
}

fn send_bval(&mut self, b: bool) -> AgreementResult<()> {
// Record the value `b` as sent.
self.sent_bval.insert(b);
Expand Down Expand Up @@ -309,12 +346,21 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
if self.bin_values == BinValues::None {
return Ok(());
}
if self.count_aux() < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
let (aux_count, aux_vals) = self.count_aux();
if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Continue waiting for the (N - f) `Aux` messages.
return Ok(());
}
// Start the `Conf` message round.
self.send_conf()

// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_schedule {
CoinSchedule::False => self.on_coin(false, aux_vals.definite()),
CoinSchedule::True => self.on_coin(true, aux_vals.definite()),
CoinSchedule::Random => {
// Start the `Conf` message round.
self.send_conf()
}
}
}

fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> AgreementResult<()> {
Expand Down Expand Up @@ -344,29 +390,47 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.extend_common_coin();

if let Some(coin) = self.common_coin.next_output() {
let b = if let Some(b) = self.count_conf().1.definite() {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
self.decide(b);
}
b
} else {
coin
};
let def_bin_value = self.count_conf().1.definite();
self.on_coin(coin, def_bin_value)?;
}

self.start_next_epoch();
Ok(())
}

self.estimated = Some(b);
self.send_bval(b)?;
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
/// When the common coin has been computed, tries to decide on an output value, updates the
/// `Agreement` epoch and handles queued messages for the new epoch.
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> AgreementResult<()> {
let b = if let Some(b) = def_bin_value {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
self.decide(b);
}
b
} else {
coin
};

self.update_epoch();

self.estimated = Some(b);
self.send_bval(b)?;
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
}
Ok(())
}

// Propagates Common Coin messages to the top level.
/// Computes the coin schedule for the current `Agreement` epoch.
fn coin_schedule(&self) -> CoinSchedule {
match self.epoch % 3 {
0 => CoinSchedule::False,
1 => CoinSchedule::True,
_ => CoinSchedule::Random,
}
}

/// Propagates Common Coin messages to the top level.
fn extend_common_coin(&mut self) {
let epoch = self.epoch;
self.messages.extend(self.common_coin.message_iter().map(
Expand Down Expand Up @@ -423,7 +487,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for N -
/// f agreeing messages would not always terminate. We can, however, expect every good node to
/// send an `Aux` value that will eventually end up in our `bin_values`.
fn count_aux(&self) -> usize {
fn count_aux(&self) -> (usize, BinValues) {
let mut aux: BTreeMap<_, _> = self
.received_aux
.iter()
Expand All @@ -438,7 +502,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {

// Ensure that nodes are not counted twice.
aux.extend(term);
aux.len()
let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect();
(aux.len(), bin)
}

/// Counts the number of received `Conf` messages.
Expand All @@ -452,7 +517,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
(vals_cnt.count(), vals.cloned().collect())
}

fn start_next_epoch(&mut self) {
fn update_epoch(&mut self) {
self.bin_values.clear();
self.received_bval.clear();
self.sent_bval.clear();
Expand All @@ -466,7 +531,10 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
*self.netinfo.node_index(&self.proposer_id).unwrap(),
self.epoch,
);
// TODO: Don't spend time creating a `CommonCoin` instance in epochs where the common coin
// is known.
self.common_coin = CommonCoin::new(self.netinfo.clone(), nonce);
self.coin_schedule = self.coin_schedule();
debug!(
"{:?} Agreement instance {:?} started epoch {}",
self.netinfo.our_uid(),
Expand Down

0 comments on commit 7a4fc37

Please sign in to comment.