Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and fix event_loop #3157

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ impl<V: StepVote> Aggregator<V> {
let quorum_reached = total >= quorum_target;
if quorum_reached {
tracing::info!(
event = "quorum reached",
?vote,
iter = v.header().iteration,
event = "Quorum reached",
step = ?V::STEP_NAME,
round = v.header().round,
iter = v.header().iteration,
?vote,
total,
target = quorum_target,
bitset,
Expand Down
34 changes: 1 addition & 33 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::time::Duration;
use dusk_core::signatures::bls::SecretKey as BlsSecretKey;
use node_data::bls::PublicKey;
use node_data::ledger::*;
use node_data::message::{
payload, AsyncQueue, ConsensusHeader, Message, Payload,
};
use node_data::message::{payload, ConsensusHeader};
use node_data::StepName;

use crate::operations::Voter;
Expand Down Expand Up @@ -100,33 +98,3 @@ pub trait Database: Send + Sync {
async fn get_last_iter(&self) -> (Hash, u8);
async fn store_last_iter(&mut self, data: (Hash, u8));
}

#[derive(Clone)]
pub(crate) struct QuorumMsgSender {
queue: AsyncQueue<Message>,
}

impl QuorumMsgSender {
pub(crate) fn new(queue: AsyncQueue<Message>) -> Self {
Self { queue }
}

/// Sends a quorum (internally) to the lower layer.
pub(crate) async fn send_quorum(&self, msg: Message) {
match &msg.payload {
Payload::Quorum(q) if !q.att.ratification.is_empty() => {
tracing::debug!(
event = "send quorum_msg",
vote = ?q.vote(),
round = msg.header.round,
iteration = msg.header.iteration,
validation = ?q.att.validation,
ratification = ?q.att.ratification,
);
}
_ => return,
}

self.queue.try_send(msg);
}
}
9 changes: 3 additions & 6 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::{debug, error, warn, Instrument};

use crate::commons::{Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{Database, RoundUpdate};
use crate::config::{CONSENSUS_MAX_ITER, EMERGENCY_MODE_ITERATION_THRESHOLD};
use crate::errors::ConsensusError;
use crate::execution_ctx::ExecutionCtx;
Expand Down Expand Up @@ -86,10 +86,9 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
) -> Result<(), ConsensusError> {
let round = ru.round;
debug!(event = "consensus started", round);
let sender = QuorumMsgSender::new(self.outbound.clone());

// proposal-validation-ratification loop
let mut handle = self.spawn_consensus(ru, provisioners, sender);
let mut handle = self.spawn_consensus(ru, provisioners);

// Usually this select will be terminated due to cancel signal however
// it may also be terminated due to unrecoverable error in the main loop
Expand Down Expand Up @@ -122,7 +121,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
&self,
ru: RoundUpdate,
provisioners: Arc<Provisioners>,
sender: QuorumMsgSender,
) -> JoinHandle<()> {
let inbound = self.inbound.clone();
let outbound = self.outbound.clone();
Expand Down Expand Up @@ -237,7 +235,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
step_name,
executor.clone(),
sv_registry.clone(),
sender.clone(),
);

// Execute a phase
Expand Down Expand Up @@ -265,7 +262,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
);

// Broadcast/Rebroadcast
sender.send_quorum(msg.clone()).await;
outbound.try_send(msg.clone());

// INFO: we keep running consensus even with Success
// Quorum in case we fail to accept the block.
Expand Down
82 changes: 44 additions & 38 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::time;
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};

use crate::commons::{Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{Database, RoundUpdate};
use crate::config::{
is_emergency_iter, CONSENSUS_MAX_ITER, MAX_ROUND_DISTANCE,
};
Expand Down Expand Up @@ -56,7 +56,6 @@ pub struct ExecutionCtx<'a, T, DB: Database> {
pub client: Arc<T>,

pub sv_registry: SafeAttestationInfoRegistry,
quorum_sender: QuorumMsgSender,
}

impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
Expand All @@ -73,7 +72,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
step: StepName,
client: Arc<T>,
sv_registry: SafeAttestationInfoRegistry,
quorum_sender: QuorumMsgSender,
) -> Self {
Self {
iter_ctx,
Expand All @@ -86,7 +84,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
step,
client,
sv_registry,
quorum_sender,
step_start_time: None,
}
}
Expand All @@ -113,7 +110,7 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
}

/// Returns true if the last step of last iteration is currently running
fn last_step_running(&self) -> bool {
fn is_last_step(&self) -> bool {
self.iteration == CONSENSUS_MAX_ITER - 1
&& self.step_name() == StepName::Ratification
}
Expand All @@ -132,22 +129,25 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
phase: Arc<Mutex<C>>,
additional_timeout: Option<Duration>,
) -> Message {
let open_consensus_mode = self.last_step_running();

// When consensus is in open_consensus_mode then it keeps Ratification
// step running indefinitely until either a valid block or
// emergency block is accepted
let timeout = if open_consensus_mode {
let dur = Duration::new(u32::MAX as u64, 0);
info!(event = "run event_loop", ?dur, mode = "open_consensus",);
dur
} else {
let dur = self.iter_ctx.get_timeout(self.step_name());
debug!(event = "run event_loop", ?dur, ?additional_timeout);
dur + additional_timeout.unwrap_or_default()
};
let round = self.round_update.round;
let iter = self.iteration;
let step = self.step_name();

let mut open_consensus_mode = false;

let step_timeout = self.iter_ctx.get_timeout(step);
let timeout = step_timeout + additional_timeout.unwrap_or_default();

debug!(
event = "Start step loop",
?step,
round,
iter,
?step_timeout,
?additional_timeout
);

let deadline = Instant::now().checked_add(timeout).unwrap();
let mut deadline = Instant::now().checked_add(timeout).unwrap();
let inbound = self.inbound.clone();

// Handle both timeout event and messages from inbound queue.
Expand All @@ -166,12 +166,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
.process_inbound_msg(phase.clone(), msg.clone())
.await
{
info!(
event = "Step completed",
step = ?self.step_name(),
info = ?msg.header
);

// In the normal case, we just return the result
// to Consensus
if !open_consensus_mode {
Expand All @@ -197,9 +191,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
is_local = true
);

self.quorum_sender
.send_quorum(msg)
.await;
// Broadcast Quorum
self.outbound.try_send(msg);
}
RatificationResult::Fail(vote) => {
debug!(
Expand Down Expand Up @@ -259,10 +252,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
is_local = false
);

// Repropagate Success Quorum
self.quorum_sender
.send_quorum(msg.clone())
.await;
// Broadcast Success Quorum
self.outbound.try_send(msg.clone());
}
RatificationResult::Fail(vote) => {
debug!(
Expand Down Expand Up @@ -345,15 +336,29 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
}
}
}

Ok(Err(e)) => {
warn!("Error while receiving msg: {e}");
}

// Timeout event. Phase could not reach its final goal.
// Increase timeout for next execution of this step and move on.
Err(_) => {
info!(event = "timeout-ed");
if open_consensus_mode {
error!("Timeout detected during last step running. This should never happen")
info!(event = "Step timeout expired", ?step, round, iter);

if self.is_last_step() {
info!(event = "Step ended", ?step, round, iter);

// If the last step expires, we enter Open Consensus
// mode. In this mode, the last step (Ratification)
// keeps running indefinitely, until a block is
// accepted.
info!(event = "Entering Open Consensus mode", round);

let timeout = Duration::new(u32::MAX as u64, 0);
deadline = Instant::now().checked_add(timeout).unwrap();

open_consensus_mode = true;
} else {
self.process_timeout_event(phase).await;
return Message::empty();
Expand Down Expand Up @@ -490,12 +495,13 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
info!(
event = "New Quorum",
mode = "emergency",
inf = ?m.header,
round = q.header.round,
iter = q.header.iteration,
vote = ?q.vote(),
);

// Broadcast Quorum
self.quorum_sender.send_quorum(m).await;
self.outbound.try_send(m);
}
}

Expand Down
14 changes: 10 additions & 4 deletions consensus/src/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use node_data::message::Message;
use node_data::StepName;
use tracing::{debug, trace};
use tracing::{info, trace};

use crate::commons::Database;
use crate::execution_ctx::ExecutionCtx;
Expand Down Expand Up @@ -54,10 +54,16 @@ impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {
pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message {
ctx.set_start_time();

let timeout = ctx.iter_ctx.get_timeout(ctx.step_name());
debug!(event = "execute_step", ?timeout);
let step = ctx.step_name();
let round = ctx.round_update.round;
let iter = ctx.iteration;
let timeout = ctx.iter_ctx.get_timeout(step);

// Execute step
await_phase!(self, run(ctx))
info!(event = "Step started", ?step, round, iter, ?timeout);
let msg = await_phase!(self, run(ctx));
info!(event = "Step ended", ?step, round, iter);

msg
}
}
3 changes: 3 additions & 0 deletions consensus/src/proposal/block_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ impl<T: Operations> Generator<T> {
info!(
event = "Candidate generated",
hash = &to_str(&candidate.header().hash),
round = candidate.header().height,
iter = candidate.header().iteration,
prev_block = &to_str(&candidate.header().prev_block_hash),
gas_limit = candidate.header().gas_limit,
state_hash = &to_str(&candidate.header().state_hash),
dur = format!("{:?}ms", start.elapsed().as_millis()),
Expand Down
16 changes: 16 additions & 0 deletions consensus/src/proposal/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ impl<D: Database> MsgHandler for ProposalHandler<D> {
.store_candidate_block(p.candidate.clone())
.await;

info!(
event = "New Candidate",
hash = &to_str(&p.candidate.header().hash),
round = p.candidate.header().height,
iter = p.candidate.header().iteration,
prev_block = &to_str(&p.candidate.header().prev_block_hash)
);

Ok(StepOutcome::Ready(msg))
}

Expand All @@ -84,6 +92,14 @@ impl<D: Database> MsgHandler for ProposalHandler<D> {
.store_candidate_block(p.candidate.clone())
.await;

info!(
event = "New Candidate",
hash = &to_str(&p.candidate.header().hash),
round = p.candidate.header().height,
iter = p.candidate.header().iteration,
prev_block = &to_str(&p.candidate.header().prev_block_hash)
);

Ok(StepOutcome::Ready(msg))
}

Expand Down
Loading
Loading