Skip to content

Commit

Permalink
consensus: Support open-consensus-mode in event_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
goshawk-3 committed Jul 11, 2024
1 parent 88b35bb commit dec3d02
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
1 change: 0 additions & 1 deletion consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ pub enum ConsensusError {
NotCommitteeMember,
NotImplemented,
NotReady,
MaxIterationReached,
ChildTaskTerminated,
Canceled(u64),
VoteAlreadyCollected,
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, QuorumMsgSender, RoundUpdate};
use crate::config::CONSENSUS_MAX_ITER;
use crate::operations::Operations;
use crate::phase::Phase;

Expand Down Expand Up @@ -120,7 +119,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
/// Consensus loop terminates on any of these conditions:
///
/// * A fully valid block for current round is accepted
/// * Consensus reaches CONSENSUS_MAX_ITER
/// * Unrecoverable error is returned by a step execution
fn spawn_consensus(
&self,
Expand Down Expand Up @@ -186,7 +184,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
ru.base_timeouts.clone(),
);

while iter < CONSENSUS_MAX_ITER {
loop {
Self::consensus_delay().await;

iter_ctx.on_begin(iter);
Expand Down Expand Up @@ -234,10 +232,8 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
}

iter_ctx.on_close();

iter += 1;
}
Err(ConsensusError::MaxIterationReached)
})
}

Expand Down
44 changes: 39 additions & 5 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use node_data::message::{AsyncQueue, Message, Payload};

use node_data::StepName;

use crate::config::EMERGENCY_MODE_ITERATION_THRESHOLD;
use crate::config::{CONSENSUS_MAX_ITER, EMERGENCY_MODE_ITERATION_THRESHOLD};
use crate::ratification::step::RatificationStep;
use crate::validation::step::ValidationStep;
use node_data::message::payload::{QuorumType, ValidationResult};
use node_data::message::payload::{
QuorumType, RatificationResult, ValidationResult, Vote,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time;
use tokio::time::Instant;
Expand Down Expand Up @@ -113,6 +116,12 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
self.iter_ctx.committees.get_committee(self.step())
}

/// Returns true if the last step of last iteration is currently running
fn last_step_running(&self) -> bool {
self.iteration == CONSENSUS_MAX_ITER - 1
&& self.step_name() == StepName::Ratification
}

/// Runs a loop that collects both inbound messages and timeout event.
///
/// It accepts an instance of MsgHandler impl (phase var) and calls its
Expand All @@ -126,21 +135,46 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
&mut self,
phase: Arc<Mutex<C>>,
) -> Result<Message, ConsensusError> {
debug!(event = "run event_loop");
let open_consensus_mode = self.last_step_running();

// When consensus is in open_consensus_mode then it keeps Ratification
// step running infinitely until either a valid block or
// emergency block is accepted
let timeout = if open_consensus_mode {
let dur = Duration::new(u32::MAX as u64, 0);
info!(event = "run event_loop in open_mode", ?dur);
dur
} else {
let dur = self.iter_ctx.get_timeout(self.step_name());
debug!(event = "run event_loop", ?dur);
dur
};

let timeout = self.iter_ctx.get_timeout(self.step_name());
let deadline = Instant::now().checked_add(timeout).unwrap();

let inbound = self.inbound.clone();

// Handle both timeout event and messages from inbound queue.
loop {
match time::timeout_at(deadline, inbound.recv()).await {
// Inbound message event
Ok(Ok(msg)) => {
let success_quorum = match &msg.payload {
Payload::Quorum(q) => matches!(
q.att.result,
RatificationResult::Success(Vote::Valid(_))
),
_ => false,
};

if let Some(step_result) =
self.process_inbound_msg(phase.clone(), msg).await
{
if open_consensus_mode && !success_quorum {
// In open consensus mode, consensus step is
// terminated only in case of a success quorum
continue;
}

self.report_elapsed_time().await;
return Ok(step_result);
}
Expand Down

0 comments on commit dec3d02

Please sign in to comment.