From ab3e841301c532ebf75ea62be2f9555f01734fc8 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 7 Oct 2024 12:22:03 +0200 Subject: [PATCH 01/11] node: add clean_att_cache --- node/src/chain/fsm.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 0b588f10c6..1517ef8e35 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -302,9 +302,7 @@ impl SimpleFSM { msg: &Message, ) -> anyhow::Result> { // Clean up attestation cache - let now = Instant::now(); - self.attestations_cache - .retain(|_, (_, expiry)| *expiry > now); + self.clean_att_cache(); // FIXME: We should return the whole outcome for this quorum // Basically we need to inform the upper layer if the received quorum is @@ -451,13 +449,17 @@ impl SimpleFSM { }; // Clean up attestation cache - let now = Instant::now(); - self.attestations_cache - .retain(|_, (_, expiry)| *expiry > now); + self.clean_att_cache(); self.attestations_cache.remove(&block_hash); block_with_att } + + fn clean_att_cache(&mut self) { + let now = Instant::now(); + self.attestations_cache + .retain(|_, (_, expiry)| *expiry > now); + } } struct InSyncImpl { From 8d8aea71f5e515e7df31ec158a1ce218604d92a2 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 7 Oct 2024 18:46:23 +0200 Subject: [PATCH 02/11] node: reroute quorum msgs to consensus - Refactor on_quorum_msg to handle Success Quorum messages only - Send all quorum messages to the consensus task --- node/src/chain.rs | 64 ++++++++------- node/src/chain/acceptor.rs | 3 + node/src/chain/fsm.rs | 159 +++++++++++++++++-------------------- 3 files changed, 114 insertions(+), 112 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 6e465f74e7..6d66817e40 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -26,8 +26,8 @@ use dusk_consensus::errors::ConsensusError; pub use header_validation::verify_att; use node_data::events::Event; use node_data::ledger::{to_str, BlockWithLabel, Label}; -use node_data::message::AsyncQueue; -use node_data::message::{payload, Payload, Topics}; +use node_data::message::payload::RatificationResult; +use node_data::message::{payload, AsyncQueue, Payload, Topics}; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -127,6 +127,7 @@ impl let result_chan = acc.read().await.get_result_chan().await; let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap(); + // Message loop for Chain context loop { tokio::select! { @@ -146,13 +147,13 @@ impl } }, // Handles any inbound wire. - // Component should either process it or re-route it to the next upper layer recv = self.inbound.recv() => { let msg = recv?; + match msg.payload { Payload::Block(blk) => { info!( - event = "block received", + event = "Block received", src = "wire", blk_height = blk.header().height, blk_hash = to_str(&blk.header().hash), @@ -160,39 +161,53 @@ impl ); // Handle a block that originates from a network peer. - // By disabling block broadcast, a block may be received from a peer - // only after explicit request (on demand). + // By disabling block broadcast, a block may be received + // from a peer only after explicit request (on demand). match fsm.on_block_event(*blk, msg.metadata).await { Ok(_) => {} Err(err) => { error!(event = "fsm::on_event failed", src = "wire", err = ?err); } } + } - // Re-route message to the acceptor + Payload::Quorum(ref quorum) => { + info!( + event = "Quorum received", + src = "wire", + round = msg.header.round, + iter = msg.header.iteration, + metadata = ?msg.metadata, + ); + + // Handle potential new blocks from Success Quorum messages + if let RatificationResult::Success(_) = quorum.att.result { + fsm.on_success_quorum(quorum, msg.metadata.clone()).await; + } + + // Re-route message to the Consensus + let acc = self.acceptor.as_ref().expect("initialize is called"); + if let Err(e) = acc.read().await.reroute_msg(msg).await { + warn!("Could not reroute msg to Consensus: {}", e); + } + }, + Payload::Candidate(_) | Payload::Validation(_) | Payload::Ratification(_) => { + // Re-route message to the Consensus let acc = self.acceptor.as_ref().expect("initialize is called"); if let Err(e) = acc.read().await.reroute_msg(msg).await { - warn!("msg discarded: {e}"); + warn!("Could not reroute msg to Consensus: {}", e); } }, - Payload::Quorum(ref payload) => { - let acc = self.acceptor.as_ref().expect("initialize is called"); - if let Err(e) = acc.read().await.reroute_msg(msg.clone()).await { - warn!("msg discarded: {e}"); - } - match fsm.on_quorum_msg(payload, &msg).await { - Ok(_) => {} - Err(err) => { - warn!(event = "quorum msg", ?err); - } - }; - } - _ => warn!("invalid inbound message"), + + _ => { + warn!("invalid inbound message"); + }, } + }, // Re-routes messages originated from Consensus (upper) layer to the network layer. recv = outbound_chan.recv() => { @@ -202,12 +217,7 @@ impl // If the associated candidate block already exists, // the winner block will be compiled and redirected to the Acceptor. if let Payload::Quorum(quorum) = &msg.payload { - match fsm.on_quorum_msg(quorum, &msg).await { - Ok(_) => {} - Err(err) => { - warn!(event = "handle quorum msg from internal consensus failed", ?err); - } - }; + fsm.on_success_quorum(quorum, msg.metadata.clone()).await; } if let Payload::GetResource(res) = &msg.payload { diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 35ef71ab5d..39e83f9692 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -256,6 +256,9 @@ impl Acceptor { if *hash != self.get_curr_hash().await { broadcast(&self.network, &msg).await; } + } else { + let task = self.task.read().await; + task.main_inbound.try_send(msg); } } _ => warn!("invalid inbound message"), diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 1517ef8e35..e61c14a788 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -17,9 +17,12 @@ use crate::{vm, Network}; use crate::database::{Candidate, Ledger}; use metrics::counter; use node_data::ledger::{to_str, Attestation, Block}; -use node_data::message::payload::{GetResource, Inv, RatificationResult, Vote}; +use node_data::message::payload::{ + GetResource, Inv, Quorum, RatificationResult, Vote, +}; -use node_data::message::{payload, Message, Metadata}; +// use node_data::message::{payload, Message, Metadata, WireMessage}; +use node_data::message::{Message, Metadata}; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; @@ -288,104 +291,90 @@ impl SimpleFSM { flood_request(&self.network, &inv).await; } - /// Handles a Quorum message that is received from either the network - /// or internal consensus execution. + /// Handles a Success Quorum message that is received from either the + /// network or internal consensus execution. /// - /// The winner block is built from the quorum attestation and candidate - /// block. If the candidate is not found in local storage then the - /// block/candidate is requested from the network. - /// - /// It returns the corresponding winner block if it gets accepted - pub(crate) async fn on_quorum_msg( + /// If the corresponding Candidate is in the DB, we attach the Attestation + /// and handle the new block; otherwise, we request the Candidate from the + /// network + pub(crate) async fn on_success_quorum( &mut self, - quorum: &payload::Quorum, - msg: &Message, - ) -> anyhow::Result> { + qmsg: &Quorum, + metadata: Option, + ) { // Clean up attestation cache self.clean_att_cache(); - // FIXME: We should return the whole outcome for this quorum - // Basically we need to inform the upper layer if the received quorum is - // valid (even if it's a FailedQuorum) - // This will be usefull in order to: - // - Reset the idle timer if the current iteration reached a quorum - // - Move to next iteration if the quorum is a Failed one - // - Remove the FIXME in fsm::on_block_event - let res = match quorum.att.result { - RatificationResult::Success(Vote::Valid(hash)) => { - let local_header = self.acc.read().await.tip_header().await; - let db = self.acc.read().await.db.clone(); - let remote_height = msg.header.round; + if let RatificationResult::Success(Vote::Valid(candidate)) = + qmsg.att.result + { + let db = self.acc.read().await.db.clone(); + let tip_header = self.acc.read().await.tip_header().await; + let tip_height = tip_header.height; + let quorum_height = qmsg.header.round; + let quorum_blk = if quorum_height > tip_height + 1 { // Quorum from future - if remote_height > local_header.height + 1 { - debug!( - event = "Quorum from future", - hash = to_str(&hash), - height = remote_height, - ); - self.flood_request_block(hash, quorum.att).await; - - Ok(None) - } else { - // If the quorum msg belongs to the next block, - // if the quorum msg belongs to a block of current round - // with different hash: - // Then try to fetch the corresponding candidate and - // redirect to on_block_event - if (remote_height == local_header.height + 1) - || (remote_height == local_header.height - && local_header.hash != hash) - { - let res = db - .read() - .await - .view(|t| t.fetch_candidate_block(&hash)); - - match res { - Ok(b) => Ok(b), - Err(err) => { - error!( - event = "Candidate not found", - hash = to_str(&hash), - height = remote_height, - err = ?err, - ); + // We do not check the db because we currently do not store + // candidates from the future + None + } else if (quorum_height == tip_height + 1) + || (quorum_height == tip_height && tip_header.hash != candidate) + { + // If Quorum is for at height tip+1 or tip (but for a different + // candidate) we try to fetch the candidate from the DB + let res = db + .read() + .await + .view(|t| t.fetch_candidate_block(&candidate)); - // Candidate block is not found from local - // storage. Cache the attestation and request - // candidate block only. - self.flood_request_block(hash, quorum.att) - .await; - Err(err) - } - } - } else { - Ok(None) - } + match res { + Ok(b) => b, + Err(_) => None, } - } - _ => Ok(None), - }?; + } else { + // INFO: we currently ignore Quorum messages from the past + None + }; - if let Some(mut block) = res { - info!( - event = "block received", - src = "quorum_msg", - blk_height = block.header().height, - blk_hash = to_str(&block.header().hash), - ); + let attestation = qmsg.att; - block.set_attestation(quorum.att); - if let Some(block) = - self.on_block_event(block, msg.metadata.clone()).await? - { - return Ok(Some(block)); + if let Some(mut blk) = quorum_blk { + // Candidate found. We can build the "full" block + info!( + event = "new block", + src = "quorum_msg", + blk_height = blk.header().height, + blk_hash = to_str(&blk.header().hash), + ); + + // Attach the Attestation to the block + blk.set_attestation(attestation); + + // Handle the new block + let res = self.on_block_event(blk, metadata).await; + match res { + Ok(_) => {} + Err(e) => { + error!("Error on block handling: {e}"); + } + } + } else { + // Candidate block not found + debug!( + event = "Candidate not found. Requesting it to the network", + hash = to_str(&candidate), + height = quorum_height, + ); + + // Cache the attestation and request the candidate from the + // network. + self.flood_request_block(candidate, attestation).await; } } - Ok(None) + error!("Invalid Quorum message"); } pub(crate) async fn on_heartbeat_event(&mut self) -> anyhow::Result<()> { From 31bcbf24f77014861d8acd467a9d8d023c8f4991 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 7 Oct 2024 18:56:29 +0200 Subject: [PATCH 03/11] node-data: add is_local --- node-data/src/message.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index e0ed25930b..aa713e4e09 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -171,6 +171,10 @@ impl Message { self.version = v; self } + + pub fn is_local(&self) -> bool { + self.metadata.is_none() + } } /// Defines a transport-related properties that determines how the message From e8495d6bf35f468f33912cafdfce3d5acad5bd4f Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 7 Oct 2024 18:58:32 +0200 Subject: [PATCH 04/11] consensus: handle quorum msgs from network --- consensus/src/commons.rs | 2 +- consensus/src/consensus.rs | 52 +++++++++--- consensus/src/execution_ctx.rs | 142 +++++++++++++++++++++++++-------- 3 files changed, 152 insertions(+), 44 deletions(-) diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index 7452b86874..ac8f404088 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -99,7 +99,7 @@ impl QuorumMsgSender { Self { queue } } - /// Sends an quorum (internally) to the lower layer. + /// Sends a quorum (internally) to the lower layer. pub(crate) async fn send_quorum(&self, msg: Message) { match &msg.payload { Payload::Quorum(q) if !q.att.ratification.is_empty() => { diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 12023cc54a..0efcad8b7c 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -10,14 +10,15 @@ use crate::errors::ConsensusError; use crate::operations::Operations; use crate::phase::Phase; -use node_data::message::{AsyncQueue, Message, Topics}; +use node_data::message::payload::RatificationResult; +use node_data::message::{AsyncQueue, Message, Payload}; use crate::execution_ctx::ExecutionCtx; use crate::proposal; use crate::queue::MsgRegistry; use crate::user::provisioners::Provisioners; use crate::{ratification, validation}; -use tracing::{debug, error, info, Instrument}; +use tracing::{debug, error, info, warn, Instrument}; use crate::iteration_ctx::IterationCtx; use crate::step_votes_reg::AttInfoRegistry; @@ -208,7 +209,8 @@ impl Consensus { debug!(event = "restored iteration", ru.round, iter); } - loop { + // Round execution loop + 'round: loop { Self::consensus_delay().await; db.lock().await.store_last_iter((ru.hash(), iter)).await; @@ -221,7 +223,7 @@ impl Consensus { ); let mut msg = Message::empty(); - // Execute a single iteration + // Execute a iteration steps for phase in phases.iter_mut() { let step_name = phase.to_step_name(); // Initialize new phase with message returned by previous @@ -255,21 +257,51 @@ impl Consensus { )) .await?; - // During execution of any step we may encounter that an - // quorum is generated for a former or current iteration. - if msg.topic() == Topics::Quorum { - sender.send_quorum(msg.clone()).await; + // Handle Quorum messages produced by Consensus or received + // from the network. A Quorum for the current iteration + // means the iteration is over. + if let Payload::Quorum(qmsg) = msg.clone().payload { + // If this message was produced by Consensus, let's + // broadcast it + if msg.is_local() { + info!( + event = "Quorum produced", + round = qmsg.header.round, + iter = qmsg.header.iteration + ); + + sender.send_quorum(msg).await; + } + + match qmsg.att.result { + // With a Success Quorum we terminate the round. + // + // INFO: the acceptance of the block is handled by + // Chain. + RatificationResult::Success(_) => { + info!("Succes Quorum at iteration {iter}. Terminating the round." ); + break 'round; + } + + // With a Fail Quorum, we move to the next iteration + RatificationResult::Fail(_) => { + info!("Fail Quorum at iteration {iter}. Terminating the iteration." ); + break; + } + } } } if iter >= CONSENSUS_MAX_ITER - 1 { - error!("Trying to move to an out of bound iteration this should be a bug"); - error!("Sticking to the same iter {iter}"); + error!("Trying to increase iteration over the maximum. This should be a bug"); + warn!("Sticking to the same iter {iter}"); } else { iter_ctx.on_close(); iter += 1; } } + + Ok(()) }) } diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index fdbac441e8..e4af1f71b5 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -16,15 +16,16 @@ use crate::user::committee::Committee; use crate::user::provisioners::Provisioners; use node_data::bls::PublicKeyBytes; -use node_data::ledger::{to_str, Block}; +use node_data::ledger::Block; +use node_data::message::payload::{ + QuorumType, RatificationResult, ValidationResult, Vote, +}; use node_data::message::{AsyncQueue, Message, Payload}; - use node_data::StepName; use crate::config::{is_emergency_iter, CONSENSUS_MAX_ITER}; use crate::ratification::step::RatificationStep; use crate::validation::step::ValidationStep; -use node_data::message::payload::{QuorumType, ValidationResult, Vote}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -152,42 +153,117 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { match time::timeout_at(deadline, inbound.recv()).await { // Inbound message event Ok(Ok(msg)) => { - if let Some(step_result) = - self.process_inbound_msg(phase.clone(), msg).await - { - if open_consensus_mode { - info!( - mode = "open_consensus", - event = "message received", - topic = ?step_result.topic() - ); - if let Payload::Quorum(q) = &step_result.payload { - let vote = q.att.result.vote(); - if let Vote::Valid(hash) = vote { + match msg.payload.clone() { + Payload::Candidate(_) + | Payload::Validation(_) + | Payload::Ratification(_) => { + // If we received a Step Message, we pass it on to + // the running step for processing. + if let Some(step_result) = self + .process_inbound_msg(phase.clone(), msg) + .await + { + if open_consensus_mode { info!( mode = "open_consensus", - event = "send quorum", - hash = to_str(hash) + event = "step completed", + topic = ?step_result.topic() ); - self.quorum_sender - .send_quorum(step_result) - .await; + + if let Payload::Quorum(qmsg) = + &step_result.payload + { + match qmsg.att.result { + RatificationResult::Success(_) => { + // With a Success Quorum we can + // stop the open consensus mode + // and terminate the round + // + // INFO: by returning here, we + // let the Consensus task + // broadcast the message + return Ok(step_result); + } + RatificationResult::Fail(vote) => { + info!( + mode = "open_consensus", + event = + "ignoring Fail Quorum", + ?vote + ); + } + } + } + + // In open consensus mode, the step is only + // terminated in case of Success Quorum. + // The acceptor will cancel the consensus if + // a block is accepted + continue; } else { - info!( - mode = "open_consensus", - event = "ignoring failed quorum", - ?vote - ); + self.report_elapsed_time().await; + return Ok(step_result); } } - // In open consensus mode, consensus step is never - // terminated. - // The acceptor will cancel the consensus if a - // block is accepted - continue; - } else { - self.report_elapsed_time().await; - return Ok(step_result); + } + + // Handle Quorum messages from the network + Payload::Quorum(qmsg) => { + // We only handle messages for the current round + // and branch, and iteration <= current_iteration + let cur_round = self.round_update.round; + let cur_prev = self.round_update.hash(); + let cur_iter = self.iteration; + if qmsg.header.round == cur_round + && qmsg.header.prev_block_hash == cur_prev + && qmsg.header.iteration <= cur_iter + { + // TODO: verify Quorum + + let qiter = qmsg.header.iteration; + let att = qmsg.att; + + // Store Fail Attestations in the Registry. + // + // INFO: We do it here so we can store + // past-iteration Attestations without + // interrupting the step execution + if let RatificationResult::Fail(vote) = + att.result + { + match vote { + Vote::NoCandidate + | Vote::Invalid(_) => { + let generator = self + .iter_ctx + .get_generator(qiter); + + // INFO: this potentially overwrites + // existing Attestations + self.sv_registry + .lock() + .await + .set_attestation( + qiter, + att, + &generator.expect("There must be a valid generator") + ); + } + _ => {} + } + } + + // If we receive a Quorum message for the + // current iteration, we terminate the step and + // pass the message to the Consensus task to + // terminate the iteration. + if qiter == cur_iter { + return Ok(msg); + } + } + } + _ => { + warn!("Unexpected msg received in Consensus") } } } From d4b5bca48707cbba62ade964ecfa38bf8b2e8712 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Tue, 8 Oct 2024 17:27:45 +0200 Subject: [PATCH 05/11] consensus: add get_iteration_atts, set_attestation --- consensus/src/step_votes_reg.rs | 58 ++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/consensus/src/step_votes_reg.rs b/consensus/src/step_votes_reg.rs index fad0c8976f..8a711ca0ee 100644 --- a/consensus/src/step_votes_reg.rs +++ b/consensus/src/step_votes_reg.rs @@ -179,17 +179,59 @@ impl AttInfoRegistry { if sv == StepVotes::default() { return None; } - let att = self - .att_list - .entry(iteration) - .or_insert_with(|| IterationAtts::new(*generator)); - let att_info = att.get_or_insert(vote); + let iter_atts = self.get_iteration_atts(iteration, generator); + let att_info = iter_atts.get_or_insert(vote); att_info.set_sv(iteration, sv, step, quorum_reached); - att_info - .is_ready() - .then(|| Self::build_quorum_msg(&self.ru, iteration, att_info.att)) + + let attestation = att_info.att; + let is_ready = att_info.is_ready(); + + if is_ready { + return Some(Self::build_quorum_msg( + &self.ru, + iteration, + attestation, + )); + } + + None + } + + fn get_iteration_atts( + &mut self, + iteration: u8, + generator: &PublicKeyBytes, + ) -> &mut IterationAtts { + self.att_list + .entry(iteration) + .or_insert_with(|| IterationAtts::new(*generator)) + } + + pub(crate) fn set_attestation( + &mut self, + iteration: u8, + attestation: Attestation, + generator: &PublicKeyBytes, + ) { + let iter_atts = self.get_iteration_atts(iteration, generator); + + let vote = attestation.result.vote(); + let att_info = iter_atts.get_or_insert(vote); + + att_info.set_sv( + iteration, + attestation.validation, + StepName::Validation, + true, + ); + att_info.set_sv( + iteration, + attestation.ratification, + StepName::Ratification, + true, + ); } fn build_quorum_msg( From d3940ffef0b1ed4b2922410f81bbe329090f8fb6 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Tue, 8 Oct 2024 23:46:23 +0200 Subject: [PATCH 06/11] node: use Option expected_result in verify_att --- node/benches/accept.rs | 4 +-- node/src/chain/header_validation.rs | 45 +++++++++++++++-------------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/node/benches/accept.rs b/node/benches/accept.rs index d8d013657c..de4a327d99 100644 --- a/node/benches/accept.rs +++ b/node/benches/accept.rs @@ -179,9 +179,9 @@ pub fn verify_att(c: &mut Criterion) { consensus_header, tip_header.seed, &provisioners, - RatificationResult::Success(Vote::Valid( + Some(RatificationResult::Success(Vote::Valid( block_hash, - )), + ))), ) .await .expect("block to be verified") diff --git a/node/src/chain/header_validation.rs b/node/src/chain/header_validation.rs index 4019200e09..632378b21d 100644 --- a/node/src/chain/header_validation.rs +++ b/node/src/chain/header_validation.rs @@ -89,7 +89,7 @@ impl<'a, DB: database::DB> Validator<'a, DB> { header.to_consensus_header(), self.prev_header.seed, self.provisioners.current(), - RatificationResult::Success(Vote::Valid(header.hash)), + Some(RatificationResult::Success(Vote::Valid(header.hash))), ) .await?; } @@ -250,7 +250,7 @@ impl<'a, DB: database::DB> Validator<'a, DB> { self.prev_header.to_consensus_header(), prev_block_seed, self.provisioners.prev(), - RatificationResult::Success(Vote::Valid(prev_block_hash)), + Some(RatificationResult::Success(Vote::Valid(prev_block_hash))), ) .await?; @@ -298,7 +298,7 @@ impl<'a, DB: database::DB> Validator<'a, DB> { consensus_header, self.prev_header.seed, self.provisioners.current(), - RatificationResult::Fail(Vote::default()), + Some(RatificationResult::Fail(Vote::default())), ) .await?; @@ -399,31 +399,32 @@ pub async fn verify_att( consensus_header: ConsensusHeader, curr_seed: Signature, curr_eligible_provisioners: &Provisioners, - expected_result: RatificationResult, + expected_result: Option, ) -> Result<(QuorumResult, QuorumResult, Vec), AttestationError> { // Check expected result - match (att.result, expected_result) { - // Both are Success and the inner Valid(Hash) values match - ( - RatificationResult::Success(Vote::Valid(r_hash)), - RatificationResult::Success(Vote::Valid(e_hash)), - ) => { - if r_hash != e_hash { - return Err(AttestationError::InvalidHash(e_hash, r_hash)); + if let Some(expected) = expected_result { + match (att.result, expected) { + // Both are Success and the inner Valid(Hash) values match + ( + RatificationResult::Success(Vote::Valid(r_hash)), + RatificationResult::Success(Vote::Valid(e_hash)), + ) => { + if r_hash != e_hash { + return Err(AttestationError::InvalidHash(e_hash, r_hash)); + } + } + // Both are Fail + (RatificationResult::Fail(_), RatificationResult::Fail(_)) => {} + // All other mismatches + _ => { + return Err(AttestationError::InvalidResult( + att.result, expected, + )); } - } - // Both are Fail - (RatificationResult::Fail(_), RatificationResult::Fail(_)) => {} - // All other mismatches - _ => { - return Err(AttestationError::InvalidResult( - att.result, - expected_result, - )); } } - let committee = RwLock::new(CommitteeSet::new(curr_eligible_provisioners)); + let committee = RwLock::new(CommitteeSet::new(curr_eligible_provisioners)); let vote = att.result.vote(); // Verify validation From b12cfee2365e95f44a930b56354de0817b78a71d Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Wed, 9 Oct 2024 13:29:25 +0200 Subject: [PATCH 07/11] node: verify Quorum before reroute --- node/src/chain/acceptor.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 39e83f9692..dfb02a3518 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -35,7 +35,7 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::{debug, info, warn}; use super::consensus::Task; -use crate::chain::header_validation::{verify_faults, Validator}; +use crate::chain::header_validation::{verify_att, verify_faults, Validator}; use crate::chain::metrics::AverageElapsedTime; use crate::database::rocksdb::{ MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, MD_HASH_KEY, @@ -249,7 +249,35 @@ impl Acceptor { task.main_inbound.try_send(msg); } } - Payload::Quorum(payload) => { + Payload::Quorum(qmsg) => { + // If Quorum is for the current round, we verify it and reroute + // it to Consensus + let tip_header = self.tip.read().await.inner().header().clone(); + if qmsg.header.round == tip_header.height + 1 { + // Verify Attestation + let cur_seed = tip_header.seed; + let cur_provisioners = + self.provisioners_list.read().await.current().clone(); + + let res = verify_att( + &qmsg.att, + qmsg.header.clone(), + cur_seed, + &cur_provisioners, + None, + ) + .await; + + if res.is_ok() { + // Rebroadcast + broadcast(&self.network, &msg.clone()).await; + + // Reroute to Consensus + let task = self.task.read().await; + task.main_inbound.try_send(msg); + } + } + // Prevent the rebroadcast of any quorum messages if the // blockchain tip has already been updated for the same round. if let Vote::Valid(hash) = payload.vote() { From 1dd4eecd9f00006a8ba67988fd653f0eb3b9f64b Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:17:05 +0200 Subject: [PATCH 08/11] node: handle past and future Quorum propagation --- node/src/chain/acceptor.rs | 101 +++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index dfb02a3518..ec3bcb9029 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -18,8 +18,7 @@ use node_data::events::{ use node_data::ledger::{ self, to_str, Block, BlockWithLabel, Label, Seed, Slash, SpentTransaction, }; -use node_data::message::AsyncQueue; -use node_data::message::Payload; +use node_data::message::{AsyncQueue, Payload, Status}; use core::panic; use dusk_consensus::operations::Voter; @@ -250,43 +249,73 @@ impl Acceptor { } } Payload::Quorum(qmsg) => { - // If Quorum is for the current round, we verify it and reroute - // it to Consensus let tip_header = self.tip.read().await.inner().header().clone(); - if qmsg.header.round == tip_header.height + 1 { - // Verify Attestation - let cur_seed = tip_header.seed; - let cur_provisioners = - self.provisioners_list.read().await.current().clone(); - - let res = verify_att( - &qmsg.att, - qmsg.header.clone(), - cur_seed, - &cur_provisioners, - None, - ) - .await; - - if res.is_ok() { - // Rebroadcast - broadcast(&self.network, &msg.clone()).await; - // Reroute to Consensus - let task = self.task.read().await; - task.main_inbound.try_send(msg); + match msg.header.compare_round(tip_header.height + 1) { + // If Quorum is for the current round, we verify it, + // rebroadcast it, and reroute it to Consensus + Status::Present => { + // Verify Attestation + let cur_seed = tip_header.seed; + let cur_provisioners = self + .provisioners_list + .read() + .await + .current() + .clone(); + + let res = verify_att( + &qmsg.att, + qmsg.header.clone(), + cur_seed, + &cur_provisioners, + None, + ) + .await; + + if res.is_ok() { + // Rebroadcast + broadcast(&self.network, &msg.clone()).await; + + // Reroute to Consensus + let task = self.task.read().await; + task.main_inbound.try_send(msg); + } } - } - // Prevent the rebroadcast of any quorum messages if the - // blockchain tip has already been updated for the same round. - if let Vote::Valid(hash) = payload.vote() { - if *hash != self.get_curr_hash().await { - broadcast(&self.network, &msg).await; + // If Quorum is for a past round, we only rebroadcast it if + // it's Valid and for a different candidate than what we + // accepted. + // + // The rationale is that: + // - Fail Quorums have no influence on past rounds + // - Valid Quorums for accepted candidates have been + // already broadcast by us + Status::Past => { + if let Vote::Valid(candidate) = qmsg.vote() { + // Check if the candidate is in our chain + if let Ok(candidate_exists) = self + .db + .read() + .await + .view(|t| t.get_block_exists(candidate)) + { + // If it doesn't exist, then rebroadcast the + // message + if !candidate_exists { + broadcast(&self.network, &msg.clone()) + .await; + } + } else { + warn!("Could not check candidate in DB. Skipping Quorum rebroadcast"); + }; + } + } + Status::Future => { + //INFO: we currently rebroadcast future Quorums without + // any check + broadcast(&self.network, &msg.clone()).await; } - } else { - let task = self.task.read().await; - task.main_inbound.try_send(msg); } } _ => warn!("invalid inbound message"), @@ -977,10 +1006,6 @@ impl Acceptor { self.tip.read().await.inner().header().clone() } - pub(crate) async fn get_curr_hash(&self) -> [u8; 32] { - self.tip.read().await.inner().header().hash - } - pub(crate) async fn get_last_final_block(&self) -> Result { let tip: RwLockReadGuard<'_, BlockWithLabel> = self.tip.read().await; if tip.is_final() { From 943b78c36dcebaec6fca89aa306abd0d2ae39a98 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Wed, 9 Oct 2024 18:54:12 +0200 Subject: [PATCH 09/11] node: improve reroute code --- node/src/chain/acceptor.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index ec3bcb9029..41972eb78e 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -31,7 +31,7 @@ use std::sync::{Arc, LazyLock}; use std::time::Duration; use tokio::sync::mpsc::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use super::consensus::Task; use crate::chain::header_validation::{verify_att, verify_faults, Validator}; @@ -273,13 +273,18 @@ impl Acceptor { ) .await; - if res.is_ok() { - // Rebroadcast - broadcast(&self.network, &msg.clone()).await; - - // Reroute to Consensus - let task = self.task.read().await; - task.main_inbound.try_send(msg); + match res { + Ok(_) => { + // Rebroadcast + broadcast(&self.network, &msg).await; + + // Reroute to Consensus + let task = self.task.read().await; + task.main_inbound.try_send(msg); + } + Err(err) => { + error!("Attestation verification failed: {err}") + } } } @@ -303,7 +308,7 @@ impl Acceptor { // If it doesn't exist, then rebroadcast the // message if !candidate_exists { - broadcast(&self.network, &msg.clone()) + broadcast(&self.network, &msg) .await; } } else { @@ -314,7 +319,7 @@ impl Acceptor { Status::Future => { //INFO: we currently rebroadcast future Quorums without // any check - broadcast(&self.network, &msg.clone()).await; + broadcast(&self.network, &msg).await; } } } From 9e015dcb76795ff7cfd855abef04b58d12adfb1a Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Wed, 9 Oct 2024 19:05:05 +0200 Subject: [PATCH 10/11] node: fix on_success_quorum error msg --- node/src/chain/fsm.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index e61c14a788..5692a3a518 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -372,9 +372,9 @@ impl SimpleFSM { // network. self.flood_request_block(candidate, attestation).await; } + } else { + error!("Invalid Quorum message"); } - - error!("Invalid Quorum message"); } pub(crate) async fn on_heartbeat_event(&mut self) -> anyhow::Result<()> { From 7f90d32c67858a9e9476e192841791c8739ab0b1 Mon Sep 17 00:00:00 2001 From: emanuele francioni Date: Wed, 9 Oct 2024 14:33:02 +0200 Subject: [PATCH 11/11] Split "on_success_quorum" into smaller and more manageable functions --- node/src/chain/fsm.rs | 163 ++++++++++++++++++++++++------------------ 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 5692a3a518..3fd00246b4 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -302,80 +302,105 @@ impl SimpleFSM { qmsg: &Quorum, metadata: Option, ) { - // Clean up attestation cache self.clean_att_cache(); - - if let RatificationResult::Success(Vote::Valid(candidate)) = - qmsg.att.result - { - let db = self.acc.read().await.db.clone(); - let tip_header = self.acc.read().await.tip_header().await; - let tip_height = tip_header.height; - let quorum_height = qmsg.header.round; - - let quorum_blk = if quorum_height > tip_height + 1 { - // Quorum from future - - // We do not check the db because we currently do not store - // candidates from the future - None - } else if (quorum_height == tip_height + 1) - || (quorum_height == tip_height && tip_header.hash != candidate) - { - // If Quorum is for at height tip+1 or tip (but for a different - // candidate) we try to fetch the candidate from the DB - let res = db - .read() - .await - .view(|t| t.fetch_candidate_block(&candidate)); - - match res { - Ok(b) => b, - Err(_) => None, - } - } else { - // INFO: we currently ignore Quorum messages from the past - None - }; - - let attestation = qmsg.att; - - if let Some(mut blk) = quorum_blk { - // Candidate found. We can build the "full" block - info!( - event = "new block", - src = "quorum_msg", - blk_height = blk.header().height, - blk_hash = to_str(&blk.header().hash), - ); - - // Attach the Attestation to the block - blk.set_attestation(attestation); - - // Handle the new block - let res = self.on_block_event(blk, metadata).await; - match res { - Ok(_) => {} - Err(e) => { - error!("Error on block handling: {e}"); - } - } - } else { - // Candidate block not found - debug!( - event = "Candidate not found. Requesting it to the network", - hash = to_str(&candidate), - height = quorum_height, - ); - - // Cache the attestation and request the candidate from the - // network. - self.flood_request_block(candidate, attestation).await; + + let (candidate, attestation) = match qmsg.att.result { + RatificationResult::Success(Vote::Valid(candidate)) => (candidate, qmsg.att), + _ => { + error!("Invalid Quorum message"); + return; } + }; + + let quorum_blk = self.fetch_quorum_block(candidate, qmsg.header.round).await; + + match quorum_blk { + Some(blk) => self.handle_found_quorum_block(blk, attestation, metadata).await, + None => self.handle_missing_quorum_block(candidate, attestation, qmsg.header.round).await, + } + } + + /// This function encapsulates the logic for fetching a quorum block based on the candidate hash and round number. Here's a breakdown of what it does: + /// 1. It first acquires the necessary data: the database, tip header, and tip height. + /// 2. It then checks the round number against the tip height: + /// - If the round is from the future (greater than tip height + 1), or in the past (less than tip height), it returns None. + /// - If the round is either for the next block (tip height + 1) or for the current height but with a different hash, it attempts to fetch the candidate block from the database. + /// 3. When fetching from the database, it uses the view method to access the transaction, then calls fetch_candidate_block. The ok().flatten() at the end handles both the Result from the database operation and the Option from fetch_candidate_block. + async fn fetch_quorum_block( + &self, + candidate: [u8; 32], + round: u64, + ) -> Option { + let db = self.acc.read().await.db.clone(); + let tip_header = self.acc.read().await.tip_header().await; + let tip_height = tip_header.height; + + if (round == tip_height + 1) || (round == tip_height && tip_header.hash != candidate) { + // Quorum from height tip+1 or tip but for a different candidate + // We try to fetch the candidate from the DB + db.read() + .await + .view(|t| t.fetch_candidate_block(&candidate)) + .ok() + .flatten() } else { - error!("Invalid Quorum message"); + // Quorum from the future or the past + None } } + + /// Handles the case where a quorum block is found in the database. + /// It performs the following actions: + /// 1. Logs the event + /// 2. Attaches the attestation to the block + /// 3. Passes the block to the on_block_event method for further processing + async fn handle_found_quorum_block( + &mut self, + mut blk: Block, + attestation: Attestation, + metadata: Option, + ) { + // Candidate found. We can build the "full" block + info!( + event = "new block", + src = "quorum_msg", + blk_height = blk.header().height, + blk_hash = to_str(&blk.header().hash), + ); + + // Attach the Attestation to the block + blk.set_attestation(attestation); + + // Handle the new block + match self.on_block_event(blk, metadata).await { + Ok(_) => {} + Err(e) => { + error!("Error on block handling: {e}"); + } + } + } + + /// Handles the case where a quorum block is not found in the database. + /// It performs the following actions: + /// 1. Logs the event + /// 2. Caches the attestation + /// 3. Requests the candidate block from the network + async fn handle_missing_quorum_block( + &mut self, + candidate: [u8; 32], + attestation: Attestation, + quorum_height: u64, + ) { + // Candidate block not found + debug!( + event = "Candidate not found. Requesting it from the network", + hash = to_str(&candidate), + height = quorum_height, + ); + + // Cache the attestation and request the candidate from the network + self.flood_request_block(candidate, attestation).await; + } pub(crate) async fn on_heartbeat_event(&mut self) -> anyhow::Result<()> { self.stalled_sm.on_heartbeat_event().await;