Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The schedule 0, 1, get_coin(), ... #76

Merged
merged 2 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agreement/bin_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl BinValues {
replace(self, BinValues::None);
}

fn from_bool(b: bool) -> Self {
pub fn from_bool(b: bool) -> Self {
if b {
BinValues::True
} else {
Expand Down
118 changes: 93 additions & 25 deletions src/agreement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ pub struct AgreementMessage {
pub content: AgreementContent,
}

/// Possible values of the common coin schedule defining the method to derive the common coin in a
/// given epoch: as a constant value or a distributed computation.
enum CoinSchedule {
False,
True,
Random,
}

/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// Shared network information.
Expand Down Expand Up @@ -113,6 +121,8 @@ pub struct Agreement<NodeUid> {
conf_round: bool,
/// A common coin instance. It is reset on epoch update.
common_coin: CommonCoin<NodeUid, Nonce>,
/// Common coin schedule computed at the start of each epoch.
coin_schedule: CoinSchedule,
}

impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
Expand Down Expand Up @@ -201,6 +211,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
netinfo,
Nonce::new(invocation_id.as_ref(), session_id, proposer_i, 0),
),
coin_schedule: CoinSchedule::True,
})
} else {
Err(ErrorKind::UnknownProposer.into())
Expand Down Expand Up @@ -252,9 +263,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
// Send an `Aux` message at most once per epoch.
self.send_aux(b)
} else if bin_values_changed {
// If the `Conf` round has already started, a change in `bin_values` can lead to its
// end. Try if it has indeed finished.
self.try_finish_conf_round()
self.on_bin_values_changed()
} else {
Ok(())
}
Expand All @@ -267,6 +276,34 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}

/// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update
/// the epoch.
fn on_bin_values_changed(&mut self) -> AgreementResult<()> {
match self.coin_schedule {
CoinSchedule::False => {
let (aux_count, aux_vals) = self.count_aux();
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(false, aux_vals.definite())
} else {
Ok(())
}
}
CoinSchedule::True => {
let (aux_count, aux_vals) = self.count_aux();
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(true, aux_vals.definite())
} else {
Ok(())
}
}
CoinSchedule::Random => {
// If the `Conf` round has already started, a change in `bin_values` can lead to its
// end. Try if it has indeed finished.
self.try_finish_conf_round()
}
}
}

fn send_bval(&mut self, b: bool) -> AgreementResult<()> {
// Record the value `b` as sent.
self.sent_bval.insert(b);
Expand Down Expand Up @@ -309,12 +346,21 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
if self.bin_values == BinValues::None {
return Ok(());
}
if self.count_aux() < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
let (aux_count, aux_vals) = self.count_aux();
if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Continue waiting for the (N - f) `Aux` messages.
return Ok(());
}
// Start the `Conf` message round.
self.send_conf()

// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_schedule {
CoinSchedule::False => self.on_coin(false, aux_vals.definite()),
CoinSchedule::True => self.on_coin(true, aux_vals.definite()),
CoinSchedule::Random => {
// Start the `Conf` message round.
self.send_conf()
}
}
}

fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> AgreementResult<()> {
Expand Down Expand Up @@ -344,29 +390,47 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.extend_common_coin();

if let Some(coin) = self.common_coin.next_output() {
let b = if let Some(b) = self.count_conf().1.definite() {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
self.decide(b);
}
b
} else {
coin
};
let def_bin_value = self.count_conf().1.definite();
self.on_coin(coin, def_bin_value)?;
}

self.start_next_epoch();
Ok(())
}

self.estimated = Some(b);
self.send_bval(b)?;
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
/// When the common coin has been computed, tries to decide on an output value, updates the
/// `Agreement` epoch and handles queued messages for the new epoch.
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> AgreementResult<()> {
let b = if let Some(b) = def_bin_value {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
self.decide(b);
}
b
} else {
coin
};

self.update_epoch();

self.estimated = Some(b);
self.send_bval(b)?;
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
}
Ok(())
}

// Propagates Common Coin messages to the top level.
/// Computes the coin schedule for the current `Agreement` epoch.
fn coin_schedule(&self) -> CoinSchedule {
match self.epoch % 3 {
0 => CoinSchedule::True,
1 => CoinSchedule::False,
_ => CoinSchedule::Random,
}
}

/// Propagates Common Coin messages to the top level.
fn extend_common_coin(&mut self) {
let epoch = self.epoch;
self.messages.extend(self.common_coin.message_iter().map(
Expand Down Expand Up @@ -423,7 +487,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for N -
/// f agreeing messages would not always terminate. We can, however, expect every good node to
/// send an `Aux` value that will eventually end up in our `bin_values`.
fn count_aux(&self) -> usize {
fn count_aux(&self) -> (usize, BinValues) {
let mut aux: BTreeMap<_, _> = self
.received_aux
.iter()
Expand All @@ -438,7 +502,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {

// Ensure that nodes are not counted twice.
aux.extend(term);
aux.len()
let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect();
(aux.len(), bin)
}

/// Counts the number of received `Conf` messages.
Expand All @@ -452,7 +517,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
(vals_cnt.count(), vals.cloned().collect())
}

fn start_next_epoch(&mut self) {
fn update_epoch(&mut self) {
self.bin_values.clear();
self.received_bval.clear();
self.sent_bval.clear();
Expand All @@ -466,7 +531,10 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
*self.netinfo.node_index(&self.proposer_id).unwrap(),
self.epoch,
);
// TODO: Don't spend time creating a `CommonCoin` instance in epochs where the common coin
// is known.
self.common_coin = CommonCoin::new(self.netinfo.clone(), nonce);
self.coin_schedule = self.coin_schedule();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the coin schedule as a member variable at all? Isn't it easier to just call self.coin_schedule() where it's needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to make a call on every Aux message.

debug!(
"{:?} Agreement instance {:?} started epoch {}",
self.netinfo.our_uid(),
Expand Down