diff --git a/consensus/src/aggregator.rs b/consensus/src/aggregator.rs index a2532cae9..f8e41490d 100644 --- a/consensus/src/aggregator.rs +++ b/consensus/src/aggregator.rs @@ -176,10 +176,11 @@ impl Aggregator { let quorum_reached = total >= quorum_target; if quorum_reached { tracing::info!( - event = "quorum reached", - ?vote, - iter = v.header().iteration, + event = "Quorum reached", step = ?V::STEP_NAME, + round = v.header().round, + iter = v.header().iteration, + ?vote, total, target = quorum_target, bitset, diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index 05514ba04..4dc3c2728 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -13,9 +13,7 @@ use std::time::Duration; use dusk_core::signatures::bls::SecretKey as BlsSecretKey; use node_data::bls::PublicKey; use node_data::ledger::*; -use node_data::message::{ - payload, AsyncQueue, ConsensusHeader, Message, Payload, -}; +use node_data::message::{payload, ConsensusHeader}; use node_data::StepName; use crate::operations::Voter; @@ -100,33 +98,3 @@ pub trait Database: Send + Sync { async fn get_last_iter(&self) -> (Hash, u8); async fn store_last_iter(&mut self, data: (Hash, u8)); } - -#[derive(Clone)] -pub(crate) struct QuorumMsgSender { - queue: AsyncQueue, -} - -impl QuorumMsgSender { - pub(crate) fn new(queue: AsyncQueue) -> Self { - Self { queue } - } - - /// Sends a quorum (internally) to the lower layer. - pub(crate) async fn send_quorum(&self, msg: Message) { - match &msg.payload { - Payload::Quorum(q) if !q.att.ratification.is_empty() => { - tracing::debug!( - event = "send quorum_msg", - vote = ?q.vote(), - round = msg.header.round, - iteration = msg.header.iteration, - validation = ?q.att.validation, - ratification = ?q.att.ratification, - ); - } - _ => return, - } - - self.queue.try_send(msg); - } -} diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index e5cab081e..2cc510464 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -12,7 +12,7 @@ use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; use tracing::{debug, error, warn, Instrument}; -use crate::commons::{Database, QuorumMsgSender, RoundUpdate}; +use crate::commons::{Database, RoundUpdate}; use crate::config::{CONSENSUS_MAX_ITER, EMERGENCY_MODE_ITERATION_THRESHOLD}; use crate::errors::ConsensusError; use crate::execution_ctx::ExecutionCtx; @@ -86,10 +86,9 @@ impl Consensus { ) -> Result<(), ConsensusError> { let round = ru.round; debug!(event = "consensus started", round); - let sender = QuorumMsgSender::new(self.outbound.clone()); // proposal-validation-ratification loop - let mut handle = self.spawn_consensus(ru, provisioners, sender); + let mut handle = self.spawn_consensus(ru, provisioners); // Usually this select will be terminated due to cancel signal however // it may also be terminated due to unrecoverable error in the main loop @@ -122,7 +121,6 @@ impl Consensus { &self, ru: RoundUpdate, provisioners: Arc, - sender: QuorumMsgSender, ) -> JoinHandle<()> { let inbound = self.inbound.clone(); let outbound = self.outbound.clone(); @@ -237,7 +235,6 @@ impl Consensus { step_name, executor.clone(), sv_registry.clone(), - sender.clone(), ); // Execute a phase @@ -265,7 +262,7 @@ impl Consensus { ); // Broadcast/Rebroadcast - sender.send_quorum(msg.clone()).await; + outbound.try_send(msg.clone()); // INFO: we keep running consensus even with Success // Quorum in case we fail to accept the block. diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 5a3303198..e9f3080ef 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -19,7 +19,7 @@ use tokio::time; use tokio::time::Instant; use tracing::{debug, error, info, trace, warn}; -use crate::commons::{Database, QuorumMsgSender, RoundUpdate}; +use crate::commons::{Database, RoundUpdate}; use crate::config::{ is_emergency_iter, CONSENSUS_MAX_ITER, MAX_ROUND_DISTANCE, }; @@ -56,7 +56,6 @@ pub struct ExecutionCtx<'a, T, DB: Database> { pub client: Arc, pub sv_registry: SafeAttestationInfoRegistry, - quorum_sender: QuorumMsgSender, } impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { @@ -73,7 +72,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { step: StepName, client: Arc, sv_registry: SafeAttestationInfoRegistry, - quorum_sender: QuorumMsgSender, ) -> Self { Self { iter_ctx, @@ -86,7 +84,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { step, client, sv_registry, - quorum_sender, step_start_time: None, } } @@ -113,7 +110,7 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { } /// Returns true if the last step of last iteration is currently running - fn last_step_running(&self) -> bool { + fn is_last_step(&self) -> bool { self.iteration == CONSENSUS_MAX_ITER - 1 && self.step_name() == StepName::Ratification } @@ -132,22 +129,25 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { phase: Arc>, additional_timeout: Option, ) -> Message { - let open_consensus_mode = self.last_step_running(); - - // When consensus is in open_consensus_mode then it keeps Ratification - // step running indefinitely until either a valid block or - // emergency block is accepted - let timeout = if open_consensus_mode { - let dur = Duration::new(u32::MAX as u64, 0); - info!(event = "run event_loop", ?dur, mode = "open_consensus",); - dur - } else { - let dur = self.iter_ctx.get_timeout(self.step_name()); - debug!(event = "run event_loop", ?dur, ?additional_timeout); - dur + additional_timeout.unwrap_or_default() - }; + let round = self.round_update.round; + let iter = self.iteration; + let step = self.step_name(); + + let mut open_consensus_mode = false; + + let step_timeout = self.iter_ctx.get_timeout(step); + let timeout = step_timeout + additional_timeout.unwrap_or_default(); + + debug!( + event = "Start step loop", + ?step, + round, + iter, + ?step_timeout, + ?additional_timeout + ); - let deadline = Instant::now().checked_add(timeout).unwrap(); + let mut deadline = Instant::now().checked_add(timeout).unwrap(); let inbound = self.inbound.clone(); // Handle both timeout event and messages from inbound queue. @@ -166,12 +166,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { .process_inbound_msg(phase.clone(), msg.clone()) .await { - info!( - event = "Step completed", - step = ?self.step_name(), - info = ?msg.header - ); - // In the normal case, we just return the result // to Consensus if !open_consensus_mode { @@ -197,9 +191,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { is_local = true ); - self.quorum_sender - .send_quorum(msg) - .await; + // Broadcast Quorum + self.outbound.try_send(msg); } RatificationResult::Fail(vote) => { debug!( @@ -259,10 +252,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { is_local = false ); - // Repropagate Success Quorum - self.quorum_sender - .send_quorum(msg.clone()) - .await; + // Broadcast Success Quorum + self.outbound.try_send(msg.clone()); } RatificationResult::Fail(vote) => { debug!( @@ -345,15 +336,29 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { } } } + Ok(Err(e)) => { warn!("Error while receiving msg: {e}"); } + // Timeout event. Phase could not reach its final goal. // Increase timeout for next execution of this step and move on. Err(_) => { - info!(event = "timeout-ed"); - if open_consensus_mode { - error!("Timeout detected during last step running. This should never happen") + info!(event = "Step timeout expired", ?step, round, iter); + + if self.is_last_step() { + info!(event = "Step ended", ?step, round, iter); + + // If the last step expires, we enter Open Consensus + // mode. In this mode, the last step (Ratification) + // keeps running indefinitely, until a block is + // accepted. + info!(event = "Entering Open Consensus mode", round); + + let timeout = Duration::new(u32::MAX as u64, 0); + deadline = Instant::now().checked_add(timeout).unwrap(); + + open_consensus_mode = true; } else { self.process_timeout_event(phase).await; return Message::empty(); @@ -490,12 +495,13 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { info!( event = "New Quorum", mode = "emergency", - inf = ?m.header, + round = q.header.round, + iter = q.header.iteration, vote = ?q.vote(), ); // Broadcast Quorum - self.quorum_sender.send_quorum(m).await; + self.outbound.try_send(m); } } diff --git a/consensus/src/phase.rs b/consensus/src/phase.rs index b351b41d5..18ac77445 100644 --- a/consensus/src/phase.rs +++ b/consensus/src/phase.rs @@ -6,7 +6,7 @@ use node_data::message::Message; use node_data::StepName; -use tracing::{debug, trace}; +use tracing::{info, trace}; use crate::commons::Database; use crate::execution_ctx::ExecutionCtx; @@ -54,10 +54,16 @@ impl Phase { pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message { ctx.set_start_time(); - let timeout = ctx.iter_ctx.get_timeout(ctx.step_name()); - debug!(event = "execute_step", ?timeout); + let step = ctx.step_name(); + let round = ctx.round_update.round; + let iter = ctx.iteration; + let timeout = ctx.iter_ctx.get_timeout(step); // Execute step - await_phase!(self, run(ctx)) + info!(event = "Step started", ?step, round, iter, ?timeout); + let msg = await_phase!(self, run(ctx)); + info!(event = "Step ended", ?step, round, iter); + + msg } } diff --git a/consensus/src/proposal/block_generator.rs b/consensus/src/proposal/block_generator.rs index fe56b2103..3593d5d4e 100644 --- a/consensus/src/proposal/block_generator.rs +++ b/consensus/src/proposal/block_generator.rs @@ -57,6 +57,9 @@ impl Generator { info!( event = "Candidate generated", hash = &to_str(&candidate.header().hash), + round = candidate.header().height, + iter = candidate.header().iteration, + prev_block = &to_str(&candidate.header().prev_block_hash), gas_limit = candidate.header().gas_limit, state_hash = &to_str(&candidate.header().state_hash), dur = format!("{:?}ms", start.elapsed().as_millis()), diff --git a/consensus/src/proposal/handler.rs b/consensus/src/proposal/handler.rs index 248aeb2fc..5ccd950c2 100644 --- a/consensus/src/proposal/handler.rs +++ b/consensus/src/proposal/handler.rs @@ -67,6 +67,14 @@ impl MsgHandler for ProposalHandler { .store_candidate_block(p.candidate.clone()) .await; + info!( + event = "New Candidate", + hash = &to_str(&p.candidate.header().hash), + round = p.candidate.header().height, + iter = p.candidate.header().iteration, + prev_block = &to_str(&p.candidate.header().prev_block_hash) + ); + Ok(StepOutcome::Ready(msg)) } @@ -84,6 +92,14 @@ impl MsgHandler for ProposalHandler { .store_candidate_block(p.candidate.clone()) .await; + info!( + event = "New Candidate", + hash = &to_str(&p.candidate.header().hash), + round = p.candidate.header().height, + iter = p.candidate.header().iteration, + prev_block = &to_str(&p.candidate.header().prev_block_hash) + ); + Ok(StepOutcome::Ready(msg)) } diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 528ca65ca..dc4cbc1e6 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -198,15 +198,17 @@ impl Serializable for Message { match &self.payload { Payload::Candidate(p) => p.write(w), Payload::Validation(p) => p.write(w), + Payload::Ratification(p) => p.write(w), Payload::Quorum(p) => p.write(w), + Payload::ValidationQuorum(p) => p.write(w), + Payload::Block(p) => p.write(w), Payload::Transaction(p) => p.write(w), Payload::GetMempool(p) => p.write(w), Payload::Inv(p) => p.write(w), Payload::GetBlocks(p) => p.write(w), Payload::GetResource(p) => p.write(w), - Payload::Ratification(p) => p.write(w), - Payload::ValidationQuorum(p) => p.write(w), + Payload::Empty | Payload::ValidationResult(_) => Ok(()), /* internal message, not sent on the wire */ } } @@ -227,12 +229,14 @@ impl Serializable for Message { Topics::ValidationQuorum => { payload::ValidationQuorum::read(r)?.into() } + Topics::Block => ledger::Block::read(r)?.into(), Topics::Tx => ledger::Transaction::read(r)?.into(), Topics::GetResource => payload::GetResource::read(r)?.into(), Topics::GetBlocks => payload::GetBlocks::read(r)?.into(), Topics::GetMempool => payload::GetMempool::read(r)?.into(), Topics::Inv => payload::Inv::read(r)?.into(), + Topics::Unknown => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -439,21 +443,20 @@ impl Payload { } } -impl From for Payload { - fn from(value: payload::Ratification) -> Self { - Self::Ratification(value) +// Consensus messages +impl From for Payload { + fn from(value: payload::Candidate) -> Self { + Self::Candidate(Box::new(value)) } } - impl From for Payload { fn from(value: payload::Validation) -> Self { Self::Validation(value) } } - -impl From for Payload { - fn from(value: payload::Candidate) -> Self { - Self::Candidate(Box::new(value)) +impl From for Payload { + fn from(value: payload::Ratification) -> Self { + Self::Ratification(value) } } impl From for Payload { @@ -461,6 +464,13 @@ impl From for Payload { Self::Quorum(value) } } +impl From for Payload { + fn from(value: payload::ValidationQuorum) -> Self { + Self::ValidationQuorum(Box::new(value)) + } +} + +// Data exchange messages impl From for Payload { fn from(value: ledger::Block) -> Self { Self::Block(Box::new(value)) @@ -492,12 +502,7 @@ impl From for Payload { } } -impl From for Payload { - fn from(value: payload::ValidationQuorum) -> Self { - Self::ValidationQuorum(Box::new(value)) - } -} - +// Internal messages impl From for Payload { fn from(value: payload::ValidationResult) -> Self { Self::ValidationResult(Box::new(value)) diff --git a/rusk/src/lib/http/chain.rs b/rusk/src/lib/http/chain.rs index 1054ab96b..c345b8c62 100644 --- a/rusk/src/lib/http/chain.rs +++ b/rusk/src/lib/http/chain.rs @@ -26,6 +26,7 @@ use async_graphql::{ EmptyMutation, EmptySubscription, Name, Schema, Variables, }; use serde_json::{json, Map, Value}; +use tracing::error; use super::*; use crate::node::RuskNode; @@ -182,7 +183,15 @@ impl RuskNode { .map_err(|e| anyhow::anyhow!("Invalid Data {e:?}"))?; let db = self.inner().database(); let vm = self.inner().vm_handler(); - MempoolSrv::check_tx(&db, &vm, &tx.into(), true, usize::MAX).await?; + let tx = tx.into(); + + MempoolSrv::check_tx(&db, &vm, &tx, true, usize::MAX) + .await + .map_err(|e| { + error!("Tx {} not accepted: {e}", hex::encode(tx.id())); + e + })?; + Ok(ResponseData::new(DataType::None)) } diff --git a/rusk/src/lib/node/vm.rs b/rusk/src/lib/node/vm.rs index ff0c31e21..a15ddd814 100644 --- a/rusk/src/lib/node/vm.rs +++ b/rusk/src/lib/node/vm.rs @@ -8,7 +8,7 @@ mod query; use dusk_consensus::errors::VstError; use node_data::events::contract::ContractEvent; -use tracing::info; +use tracing::{debug, info}; use dusk_bytes::DeserializableSlice; use dusk_consensus::operations::{CallParams, VerificationOutput, Voter}; @@ -90,7 +90,7 @@ impl VMExecution for Rusk { VerificationOutput, Vec, )> { - info!("Received accept request"); + debug!("Received accept request"); let generator = blk.header().generator_bls_pubkey; let generator = BlsPublicKey::from_slice(&generator.0) .map_err(|e| anyhow::anyhow!("Error in from_slice {e:?}"))?; @@ -129,7 +129,7 @@ impl VMExecution for Rusk { commit: [u8; 32], to_merge: Vec<[u8; 32]>, ) -> anyhow::Result<()> { - info!("Received finalize request"); + debug!("Received finalize request"); self.finalize_state(commit, to_merge) .map_err(|e| anyhow::anyhow!("Cannot finalize state: {e}")) }