diff --git a/consensus/consensus-types/src/order_vote_msg.rs b/consensus/consensus-types/src/order_vote_msg.rs index 1fae85cc25a60..4c20ec8e3afac 100644 --- a/consensus/consensus-types/src/order_vote_msg.rs +++ b/consensus/consensus-types/src/order_vote_msg.rs @@ -2,7 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{order_vote::OrderVote, quorum_cert::QuorumCert}; +use crate::{common::Author, order_vote::OrderVote, quorum_cert::QuorumCert}; use anyhow::{ensure, Context}; use aptos_types::validator_verifier::ValidatorVerifier; use serde::{Deserialize, Serialize}; @@ -46,7 +46,17 @@ impl OrderVoteMsg { /// This function verifies the order_vote component in the order_vote_msg. /// The quorum cert is verified in the round manager when the quorum certificate is used. - pub fn verify_order_vote(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify_order_vote( + &self, + sender: Author, + validator: &ValidatorVerifier, + ) -> anyhow::Result<()> { + ensure!( + self.order_vote.author() == sender, + "Order vote author {:?} is different from the sender {:?}", + self.order_vote.author(), + sender + ); ensure!( self.quorum_cert().certified_block() == self.order_vote().ledger_info().commit_info(), "QuorumCert and OrderVote do not match" diff --git a/consensus/consensus-types/src/pipeline/commit_vote.rs b/consensus/consensus-types/src/pipeline/commit_vote.rs index 96a6bd7fde85d..21c466038e49c 100644 --- a/consensus/consensus-types/src/pipeline/commit_vote.rs +++ b/consensus/consensus-types/src/pipeline/commit_vote.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::common::{Author, Round}; -use anyhow::Context; +use anyhow::{ensure, Context}; use aptos_crypto::{bls12381, CryptoMaterialError}; use aptos_short_hex_str::AsShortHexStr; use aptos_types::{ @@ -101,7 +101,13 @@ impl CommitVote { /// Verifies that the consensus data hash of LedgerInfo corresponds to the commit proposal, /// and then verifies the signature. - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> { + ensure!( + self.author() == sender, + "Commit vote author {:?} doesn't match with the sender {:?}", + self.author(), + sender + ); validator .optimistic_verify(self.author(), &self.ledger_info, &self.signature) .context("Failed to verify Commit Vote") diff --git a/consensus/consensus-types/src/proposal_msg.rs b/consensus/consensus-types/src/proposal_msg.rs index e2a8e90a6193b..5382cfd6e6271 100644 --- a/consensus/consensus-types/src/proposal_msg.rs +++ b/consensus/consensus-types/src/proposal_msg.rs @@ -82,10 +82,19 @@ impl ProposalMsg { pub fn verify( &self, + sender: Author, validator: &ValidatorVerifier, proof_cache: &ProofCache, quorum_store_enabled: bool, ) -> Result<()> { + if let Some(proposal_author) = self.proposal.author() { + ensure!( + proposal_author == sender, + "Proposal author {:?} doesn't match sender {:?}", + proposal_author, + sender + ); + } self.proposal().payload().map_or(Ok(()), |p| { p.verify(validator, proof_cache, quorum_store_enabled) })?; diff --git a/consensus/consensus-types/src/vote_msg.rs b/consensus/consensus-types/src/vote_msg.rs index b12785afd60d3..da532935cf807 100644 --- a/consensus/consensus-types/src/vote_msg.rs +++ b/consensus/consensus-types/src/vote_msg.rs @@ -2,7 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{sync_info::SyncInfo, vote::Vote}; +use crate::{common::Author, sync_info::SyncInfo, vote::Vote}; use anyhow::ensure; use aptos_crypto::HashValue; use aptos_types::validator_verifier::ValidatorVerifier; @@ -54,7 +54,13 @@ impl VoteMsg { self.vote.vote_data().proposed().id() } - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> { + ensure!( + self.vote().author() == sender, + "Vote author {:?} is different from the sender {:?}", + self.vote().author(), + sender, + ); ensure!( self.vote().epoch() == self.sync_info.epoch(), "VoteMsg has different epoch" diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index babc1a2366d1e..8325c64feaab9 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -127,8 +127,12 @@ pub struct BufferManager { commit_proof_rb_handle: Option, // message received from the network - commit_msg_rx: - Option>, + commit_msg_rx: Option< + aptos_channels::aptos_channel::Receiver< + AccountAddress, + (AccountAddress, IncomingCommitRequest), + >, + >, persisting_phase_tx: Sender>, persisting_phase_rx: Receiver>, @@ -186,7 +190,7 @@ impl BufferManager { commit_msg_tx: Arc, commit_msg_rx: aptos_channels::aptos_channel::Receiver< AccountAddress, - IncomingCommitRequest, + (AccountAddress, IncomingCommitRequest), >, persisting_phase_tx: Sender>, persisting_phase_rx: Receiver>, @@ -944,12 +948,12 @@ impl BufferManager { let epoch_state = self.epoch_state.clone(); let bounded_executor = self.bounded_executor.clone(); spawn_named!("buffer manager verification", async move { - while let Some(commit_msg) = commit_msg_rx.next().await { + while let Some((sender, commit_msg)) = commit_msg_rx.next().await { let tx = verified_commit_msg_tx.clone(); let epoch_state_clone = epoch_state.clone(); bounded_executor .spawn(async move { - match commit_msg.req.verify(&epoch_state_clone.verifier) { + match commit_msg.req.verify(sender, &epoch_state_clone.verifier) { Ok(_) => { let _ = tx.unbounded_send(commit_msg); }, diff --git a/consensus/src/pipeline/commit_reliable_broadcast.rs b/consensus/src/pipeline/commit_reliable_broadcast.rs index 0fc7f066810a1..231ae552caa6b 100644 --- a/consensus/src/pipeline/commit_reliable_broadcast.rs +++ b/consensus/src/pipeline/commit_reliable_broadcast.rs @@ -34,13 +34,13 @@ pub enum CommitMessage { impl CommitMessage { /// Verify the signatures on the message - pub fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify(&self, sender: Author, verifier: &ValidatorVerifier) -> anyhow::Result<()> { match self { CommitMessage::Vote(vote) => { let _timer = counters::VERIFY_MSG .with_label_values(&["commit_vote"]) .start_timer(); - vote.verify(verifier) + vote.verify(sender, verifier) }, CommitMessage::Decision(decision) => { let _timer = counters::VERIFY_MSG diff --git a/consensus/src/pipeline/decoupled_execution_utils.rs b/consensus/src/pipeline/decoupled_execution_utils.rs index 8178d871e7efc..53e03beca995e 100644 --- a/consensus/src/pipeline/decoupled_execution_utils.rs +++ b/consensus/src/pipeline/decoupled_execution_utils.rs @@ -33,7 +33,7 @@ pub fn prepare_phases_and_buffer_manager( execution_proxy: Arc, safety_rules: Arc, commit_msg_tx: NetworkSender, - commit_msg_rx: Receiver, + commit_msg_rx: Receiver, persisting_proxy: Arc, block_rx: UnboundedReceiver, sync_rx: UnboundedReceiver, diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs index b1da5dba16ca4..9772b3af65385 100644 --- a/consensus/src/pipeline/execution_client.rs +++ b/consensus/src/pipeline/execution_client.rs @@ -112,7 +112,8 @@ pub trait TExecutionClient: Send + Sync { struct BufferManagerHandle { pub execute_tx: Option>, - pub commit_tx: Option>, + pub commit_tx: + Option>, pub reset_tx_to_buffer_manager: Option>, pub reset_tx_to_rand_manager: Option>, } @@ -130,7 +131,7 @@ impl BufferManagerHandle { pub fn init( &mut self, execute_tx: UnboundedSender, - commit_tx: aptos_channel::Sender, + commit_tx: aptos_channel::Sender, reset_tx_to_buffer_manager: UnboundedSender, reset_tx_to_rand_manager: Option>, ) { @@ -218,7 +219,7 @@ impl ExecutionProxyClient { let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::(); let (commit_msg_tx, commit_msg_rx) = - aptos_channel::new::( + aptos_channel::new::( QueueStyle::FIFO, 100, Some(&counters::BUFFER_MANAGER_MSGS), @@ -402,7 +403,7 @@ impl TExecutionClient for ExecutionProxyClient { commit_msg: IncomingCommitRequest, ) -> Result<()> { if let Some(tx) = &self.handle.read().commit_tx { - tx.push(peer_id, commit_msg) + tx.push(peer_id, (peer_id, commit_msg)) } else { counters::EPOCH_MANAGER_ISSUES_DETAILS .with_label_values(&["buffer_manager_not_started"]) diff --git a/consensus/src/pipeline/tests/buffer_manager_tests.rs b/consensus/src/pipeline/tests/buffer_manager_tests.rs index da3fc1cd733ae..574ec837054b9 100644 --- a/consensus/src/pipeline/tests/buffer_manager_tests.rs +++ b/consensus/src/pipeline/tests/buffer_manager_tests.rs @@ -63,7 +63,7 @@ pub fn prepare_buffer_manager( BufferManager, Sender, Sender, - aptos_channel::Sender, + aptos_channel::Sender, aptos_channels::UnboundedReceiver>, PipelinePhase, PipelinePhase, @@ -122,11 +122,10 @@ pub fn prepare_buffer_manager( validators.clone(), ); - let (msg_tx, msg_rx) = aptos_channel::new::( - QueueStyle::FIFO, - channel_size, - None, - ); + let (msg_tx, msg_rx) = aptos_channel::new::< + AccountAddress, + (AccountAddress, IncomingCommitRequest), + >(QueueStyle::FIFO, channel_size, None); let (result_tx, result_rx) = create_channel::(); let state_computer = Arc::new(EmptyStateComputer::new(result_tx)); @@ -185,7 +184,7 @@ pub fn prepare_buffer_manager( pub fn launch_buffer_manager() -> ( Sender, Sender, - aptos_channel::Sender, + aptos_channel::Sender, aptos_channels::UnboundedReceiver>, HashValue, Runtime, @@ -233,20 +232,20 @@ pub fn launch_buffer_manager() -> ( async fn loopback_commit_vote( msg: Event, - msg_tx: &aptos_channel::Sender, + msg_tx: &aptos_channel::Sender, verifier: &ValidatorVerifier, ) { match msg { Event::RpcRequest(author, msg, protocol, callback) => { if let ConsensusMsg::CommitMessage(msg) = msg { - msg.verify(verifier).unwrap(); + msg.verify(author, verifier).unwrap(); let request = IncomingCommitRequest { req: *msg, protocol, response_sender: callback, }; // verify the message and send the message into self loop - msg_tx.push(author, request).ok(); + msg_tx.push(author, (author, request)).ok(); } }, _ => { diff --git a/consensus/src/quorum_store/network_listener.rs b/consensus/src/quorum_store/network_listener.rs index de63e27ad88ca..5e4c9e83da611 100644 --- a/consensus/src/quorum_store/network_listener.rs +++ b/consensus/src/quorum_store/network_listener.rs @@ -69,7 +69,8 @@ impl NetworkListener { counters::QUORUM_STORE_MSG_COUNT .with_label_values(&["NetworkListener::batchmsg"]) .inc(); - let author = batch_msg.author(); + // Batch msg verify function alreay ensures that the batch_msg is not empty. + let author = batch_msg.author().expect("Empty batch message"); let batches = batch_msg.take(); counters::RECEIVED_BATCH_MSG_COUNT.inc(); diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 96b6324c39a14..dead53e9a3e1d 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -315,8 +315,8 @@ impl BatchMsg { Ok(epoch) } - pub fn author(&self) -> PeerId { - self.batches[0].author() + pub fn author(&self) -> Option { + self.batches.first().map(|batch| batch.author()) } pub fn take(self) -> Vec { diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 675e61bbeeb50..072a3a0f3c897 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -112,7 +112,7 @@ impl UnverifiedEvent { //TODO: no need to sign and verify the proposal UnverifiedEvent::ProposalMsg(p) => { if !self_message { - p.verify(validator, proof_cache, quorum_store_enabled)?; + p.verify(peer_id, validator, proof_cache, quorum_store_enabled)?; counters::VERIFY_MSG .with_label_values(&["proposal"]) .observe(start_time.elapsed().as_secs_f64()); @@ -121,7 +121,7 @@ impl UnverifiedEvent { }, UnverifiedEvent::VoteMsg(v) => { if !self_message { - v.verify(validator)?; + v.verify(peer_id, validator)?; counters::VERIFY_MSG .with_label_values(&["vote"]) .observe(start_time.elapsed().as_secs_f64()); @@ -139,7 +139,7 @@ impl UnverifiedEvent { }, UnverifiedEvent::OrderVoteMsg(v) => { if !self_message { - v.verify_order_vote(validator)?; + v.verify_order_vote(peer_id, validator)?; counters::VERIFY_MSG .with_label_values(&["order_vote"]) .observe(start_time.elapsed().as_secs_f64());