Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split "on_success_quorum" into smaller and more manageable functions #2636

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl QuorumMsgSender {
Self { queue }
}

/// Sends an quorum (internally) to the lower layer.
/// Sends a quorum (internally) to the lower layer.
pub(crate) async fn send_quorum(&self, msg: Message) {
match &msg.payload {
Payload::Quorum(q) if !q.att.ratification.is_empty() => {
Expand Down
52 changes: 42 additions & 10 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use crate::errors::ConsensusError;
use crate::operations::Operations;
use crate::phase::Phase;

use node_data::message::{AsyncQueue, Message, Topics};
use node_data::message::payload::RatificationResult;
use node_data::message::{AsyncQueue, Message, Payload};

use crate::execution_ctx::ExecutionCtx;
use crate::proposal;
use crate::queue::MsgRegistry;
use crate::user::provisioners::Provisioners;
use crate::{ratification, validation};
use tracing::{debug, error, info, Instrument};
use tracing::{debug, error, info, warn, Instrument};

use crate::iteration_ctx::IterationCtx;
use crate::step_votes_reg::AttInfoRegistry;
Expand Down Expand Up @@ -208,7 +209,8 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
debug!(event = "restored iteration", ru.round, iter);
}

loop {
// Round execution loop
'round: loop {
Self::consensus_delay().await;
db.lock().await.store_last_iter((ru.hash(), iter)).await;

Expand All @@ -221,7 +223,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
);

let mut msg = Message::empty();
// Execute a single iteration
// Execute a iteration steps
for phase in phases.iter_mut() {
let step_name = phase.to_step_name();
// Initialize new phase with message returned by previous
Expand Down Expand Up @@ -255,21 +257,51 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
))
.await?;

// During execution of any step we may encounter that an
// quorum is generated for a former or current iteration.
if msg.topic() == Topics::Quorum {
sender.send_quorum(msg.clone()).await;
// Handle Quorum messages produced by Consensus or received
// from the network. A Quorum for the current iteration
// means the iteration is over.
if let Payload::Quorum(qmsg) = msg.clone().payload {
// If this message was produced by Consensus, let's
// broadcast it
if msg.is_local() {
info!(
event = "Quorum produced",
round = qmsg.header.round,
iter = qmsg.header.iteration
);

sender.send_quorum(msg).await;
}

match qmsg.att.result {
// With a Success Quorum we terminate the round.
//
// INFO: the acceptance of the block is handled by
// Chain.
RatificationResult::Success(_) => {
info!("Succes Quorum at iteration {iter}. Terminating the round." );
break 'round;
}

// With a Fail Quorum, we move to the next iteration
RatificationResult::Fail(_) => {
info!("Fail Quorum at iteration {iter}. Terminating the iteration." );
break;
}
}
}
}

if iter >= CONSENSUS_MAX_ITER - 1 {
error!("Trying to move to an out of bound iteration this should be a bug");
error!("Sticking to the same iter {iter}");
error!("Trying to increase iteration over the maximum. This should be a bug");
warn!("Sticking to the same iter {iter}");
} else {
iter_ctx.on_close();
iter += 1;
}
}

Ok(())
})
}

Expand Down
142 changes: 109 additions & 33 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ use crate::user::committee::Committee;
use crate::user::provisioners::Provisioners;

use node_data::bls::PublicKeyBytes;
use node_data::ledger::{to_str, Block};
use node_data::ledger::Block;
use node_data::message::payload::{
QuorumType, RatificationResult, ValidationResult, Vote,
};
use node_data::message::{AsyncQueue, Message, Payload};

use node_data::StepName;

use crate::config::{is_emergency_iter, CONSENSUS_MAX_ITER};
use crate::ratification::step::RatificationStep;
use crate::validation::step::ValidationStep;
use node_data::message::payload::{QuorumType, ValidationResult, Vote};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -152,42 +153,117 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
match time::timeout_at(deadline, inbound.recv()).await {
// Inbound message event
Ok(Ok(msg)) => {
if let Some(step_result) =
self.process_inbound_msg(phase.clone(), msg).await
{
if open_consensus_mode {
info!(
mode = "open_consensus",
event = "message received",
topic = ?step_result.topic()
);
if let Payload::Quorum(q) = &step_result.payload {
let vote = q.att.result.vote();
if let Vote::Valid(hash) = vote {
match msg.payload.clone() {
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_) => {
// If we received a Step Message, we pass it on to
// the running step for processing.
if let Some(step_result) = self
.process_inbound_msg(phase.clone(), msg)
.await
{
if open_consensus_mode {
info!(
mode = "open_consensus",
event = "send quorum",
hash = to_str(hash)
event = "step completed",
topic = ?step_result.topic()
);
self.quorum_sender
.send_quorum(step_result)
.await;

if let Payload::Quorum(qmsg) =
&step_result.payload
{
match qmsg.att.result {
RatificationResult::Success(_) => {
// With a Success Quorum we can
// stop the open consensus mode
// and terminate the round
//
// INFO: by returning here, we
// let the Consensus task
// broadcast the message
return Ok(step_result);
}
RatificationResult::Fail(vote) => {
info!(
mode = "open_consensus",
event =
"ignoring Fail Quorum",
?vote
);
}
}
}

// In open consensus mode, the step is only
// terminated in case of Success Quorum.
// The acceptor will cancel the consensus if
// a block is accepted
continue;
} else {
info!(
mode = "open_consensus",
event = "ignoring failed quorum",
?vote
);
self.report_elapsed_time().await;
return Ok(step_result);
}
}
// In open consensus mode, consensus step is never
// terminated.
// The acceptor will cancel the consensus if a
// block is accepted
continue;
} else {
self.report_elapsed_time().await;
return Ok(step_result);
}

// Handle Quorum messages from the network
Payload::Quorum(qmsg) => {
// We only handle messages for the current round
// and branch, and iteration <= current_iteration
let cur_round = self.round_update.round;
let cur_prev = self.round_update.hash();
let cur_iter = self.iteration;
if qmsg.header.round == cur_round
&& qmsg.header.prev_block_hash == cur_prev
&& qmsg.header.iteration <= cur_iter
{
// TODO: verify Quorum

let qiter = qmsg.header.iteration;
let att = qmsg.att;

// Store Fail Attestations in the Registry.
//
// INFO: We do it here so we can store
// past-iteration Attestations without
// interrupting the step execution
if let RatificationResult::Fail(vote) =
att.result
{
match vote {
Vote::NoCandidate
| Vote::Invalid(_) => {
let generator = self
.iter_ctx
.get_generator(qiter);

// INFO: this potentially overwrites
// existing Attestations
self.sv_registry
.lock()
.await
.set_attestation(
qiter,
att,
&generator.expect("There must be a valid generator")
);
}
_ => {}
}
}

// If we receive a Quorum message for the
// current iteration, we terminate the step and
// pass the message to the Consensus task to
// terminate the iteration.
if qiter == cur_iter {
return Ok(msg);
}
}
}
_ => {
warn!("Unexpected msg received in Consensus")
}
}
}
Expand Down
58 changes: 50 additions & 8 deletions consensus/src/step_votes_reg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,59 @@ impl AttInfoRegistry {
if sv == StepVotes::default() {
return None;
}
let att = self
.att_list
.entry(iteration)
.or_insert_with(|| IterationAtts::new(*generator));

let att_info = att.get_or_insert(vote);
let iter_atts = self.get_iteration_atts(iteration, generator);
let att_info = iter_atts.get_or_insert(vote);

att_info.set_sv(iteration, sv, step, quorum_reached);
att_info
.is_ready()
.then(|| Self::build_quorum_msg(&self.ru, iteration, att_info.att))

let attestation = att_info.att;
let is_ready = att_info.is_ready();

if is_ready {
return Some(Self::build_quorum_msg(
&self.ru,
iteration,
attestation,
));
}

None
}

fn get_iteration_atts(
&mut self,
iteration: u8,
generator: &PublicKeyBytes,
) -> &mut IterationAtts {
self.att_list
.entry(iteration)
.or_insert_with(|| IterationAtts::new(*generator))
}

pub(crate) fn set_attestation(
&mut self,
iteration: u8,
attestation: Attestation,
generator: &PublicKeyBytes,
) {
let iter_atts = self.get_iteration_atts(iteration, generator);

let vote = attestation.result.vote();
let att_info = iter_atts.get_or_insert(vote);

att_info.set_sv(
iteration,
attestation.validation,
StepName::Validation,
true,
);
att_info.set_sv(
iteration,
attestation.ratification,
StepName::Ratification,
true,
);
}

fn build_quorum_msg(
Expand Down
4 changes: 4 additions & 0 deletions node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ impl Message {
self.version = v;
self
}

pub fn is_local(&self) -> bool {
self.metadata.is_none()
}
}

/// Defines a transport-related properties that determines how the message
Expand Down
4 changes: 2 additions & 2 deletions node/benches/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ pub fn verify_att(c: &mut Criterion) {
consensus_header,
tip_header.seed,
&provisioners,
RatificationResult::Success(Vote::Valid(
Some(RatificationResult::Success(Vote::Valid(
block_hash,
)),
))),
)
.await
.expect("block to be verified")
Expand Down
Loading