Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify consensus message author matches with the sender #15386

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions consensus/consensus-types/src/order_vote_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{order_vote::OrderVote, quorum_cert::QuorumCert};
use crate::{common::Author, order_vote::OrderVote, quorum_cert::QuorumCert};
use anyhow::{ensure, Context};
use aptos_types::validator_verifier::ValidatorVerifier;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -46,7 +46,17 @@ impl OrderVoteMsg {

/// This function verifies the order_vote component in the order_vote_msg.
/// The quorum cert is verified in the round manager when the quorum certificate is used.
pub fn verify_order_vote(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify_order_vote(
&self,
sender: Author,
validator: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(
self.order_vote.author() == sender,
"Order vote author {:?} is different from the sender {:?}",
self.order_vote.author(),
sender
);
ensure!(
self.quorum_cert().certified_block() == self.order_vote().ledger_info().commit_info(),
"QuorumCert and OrderVote do not match"
Expand Down
10 changes: 8 additions & 2 deletions consensus/consensus-types/src/pipeline/commit_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::{Author, Round};
use anyhow::Context;
use anyhow::{ensure, Context};
use aptos_crypto::{bls12381, CryptoMaterialError};
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::{
Expand Down Expand Up @@ -101,7 +101,13 @@ impl CommitVote {

/// Verifies that the consensus data hash of LedgerInfo corresponds to the commit proposal,
/// and then verifies the signature.
pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.author() == sender,
"Commit vote author {:?} doesn't match with the sender {:?}",
self.author(),
sender
);
validator
.optimistic_verify(self.author(), &self.ledger_info, &self.signature)
.context("Failed to verify Commit Vote")
Expand Down
9 changes: 9 additions & 0 deletions consensus/consensus-types/src/proposal_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,19 @@ impl ProposalMsg {

pub fn verify(
&self,
sender: Author,
validator: &ValidatorVerifier,
proof_cache: &ProofCache,
quorum_store_enabled: bool,
) -> Result<()> {
if let Some(proposal_author) = self.proposal.author() {
ensure!(
proposal_author == sender,
"Proposal author {:?} doesn't match sender {:?}",
proposal_author,
sender
);
}
self.proposal().payload().map_or(Ok(()), |p| {
p.verify(validator, proof_cache, quorum_store_enabled)
})?;
Expand Down
10 changes: 8 additions & 2 deletions consensus/consensus-types/src/vote_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{sync_info::SyncInfo, vote::Vote};
use crate::{common::Author, sync_info::SyncInfo, vote::Vote};
use anyhow::ensure;
use aptos_crypto::HashValue;
use aptos_types::validator_verifier::ValidatorVerifier;
Expand Down Expand Up @@ -54,7 +54,13 @@ impl VoteMsg {
self.vote.vote_data().proposed().id()
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.vote().author() == sender,
"Vote author {:?} is different from the sender {:?}",
self.vote().author(),
sender,
);
ensure!(
self.vote().epoch() == self.sync_info.epoch(),
"VoteMsg has different epoch"
Expand Down
14 changes: 9 additions & 5 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ pub struct BufferManager {
commit_proof_rb_handle: Option<DropGuard>,

// message received from the network
commit_msg_rx:
Option<aptos_channels::aptos_channel::Receiver<AccountAddress, IncomingCommitRequest>>,
commit_msg_rx: Option<
aptos_channels::aptos_channel::Receiver<
AccountAddress,
(AccountAddress, IncomingCommitRequest),
>,
>,

persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,
persisting_phase_rx: Receiver<ExecutorResult<Round>>,
Expand Down Expand Up @@ -186,7 +190,7 @@ impl BufferManager {
commit_msg_tx: Arc<NetworkSender>,
commit_msg_rx: aptos_channels::aptos_channel::Receiver<
AccountAddress,
IncomingCommitRequest,
(AccountAddress, IncomingCommitRequest),
>,
persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,
persisting_phase_rx: Receiver<ExecutorResult<Round>>,
Expand Down Expand Up @@ -944,12 +948,12 @@ impl BufferManager {
let epoch_state = self.epoch_state.clone();
let bounded_executor = self.bounded_executor.clone();
spawn_named!("buffer manager verification", async move {
while let Some(commit_msg) = commit_msg_rx.next().await {
while let Some((sender, commit_msg)) = commit_msg_rx.next().await {
let tx = verified_commit_msg_tx.clone();
let epoch_state_clone = epoch_state.clone();
bounded_executor
.spawn(async move {
match commit_msg.req.verify(&epoch_state_clone.verifier) {
match commit_msg.req.verify(sender, &epoch_state_clone.verifier) {
Ok(_) => {
let _ = tx.unbounded_send(commit_msg);
},
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/pipeline/commit_reliable_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ pub enum CommitMessage {

impl CommitMessage {
/// Verify the signatures on the message
pub fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
match self {
CommitMessage::Vote(vote) => {
let _timer = counters::VERIFY_MSG
.with_label_values(&["commit_vote"])
.start_timer();
vote.verify(verifier)
vote.verify(sender, verifier)
},
CommitMessage::Decision(decision) => {
let _timer = counters::VERIFY_MSG
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn prepare_phases_and_buffer_manager(
execution_proxy: Arc<dyn StateComputer>,
safety_rules: Arc<dyn CommitSignerProvider>,
commit_msg_tx: NetworkSender,
commit_msg_rx: Receiver<AccountAddress, IncomingCommitRequest>,
commit_msg_rx: Receiver<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
persisting_proxy: Arc<dyn StateComputer>,
block_rx: UnboundedReceiver<OrderedBlocks>,
sync_rx: UnboundedReceiver<ResetRequest>,
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ pub trait TExecutionClient: Send + Sync {

struct BufferManagerHandle {
pub execute_tx: Option<UnboundedSender<OrderedBlocks>>,
pub commit_tx: Option<aptos_channel::Sender<AccountAddress, IncomingCommitRequest>>,
pub commit_tx:
Option<aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>>,
pub reset_tx_to_buffer_manager: Option<UnboundedSender<ResetRequest>>,
pub reset_tx_to_rand_manager: Option<UnboundedSender<ResetRequest>>,
}
Expand All @@ -130,7 +131,7 @@ impl BufferManagerHandle {
pub fn init(
&mut self,
execute_tx: UnboundedSender<OrderedBlocks>,
commit_tx: aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
commit_tx: aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
reset_tx_to_buffer_manager: UnboundedSender<ResetRequest>,
reset_tx_to_rand_manager: Option<UnboundedSender<ResetRequest>>,
) {
Expand Down Expand Up @@ -218,7 +219,7 @@ impl ExecutionProxyClient {
let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::<ResetRequest>();

let (commit_msg_tx, commit_msg_rx) =
aptos_channel::new::<AccountAddress, IncomingCommitRequest>(
aptos_channel::new::<AccountAddress, (AccountAddress, IncomingCommitRequest)>(
QueueStyle::FIFO,
100,
Some(&counters::BUFFER_MANAGER_MSGS),
Expand Down Expand Up @@ -402,7 +403,7 @@ impl TExecutionClient for ExecutionProxyClient {
commit_msg: IncomingCommitRequest,
) -> Result<()> {
if let Some(tx) = &self.handle.read().commit_tx {
tx.push(peer_id, commit_msg)
tx.push(peer_id, (peer_id, commit_msg))
} else {
counters::EPOCH_MANAGER_ISSUES_DETAILS
.with_label_values(&["buffer_manager_not_started"])
Expand Down
19 changes: 9 additions & 10 deletions consensus/src/pipeline/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn prepare_buffer_manager(
BufferManager,
Sender<OrderedBlocks>,
Sender<ResetRequest>,
aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
aptos_channels::UnboundedReceiver<Event<ConsensusMsg>>,
PipelinePhase<ExecutionSchedulePhase>,
PipelinePhase<ExecutionWaitPhase>,
Expand Down Expand Up @@ -122,11 +122,10 @@ pub fn prepare_buffer_manager(
validators.clone(),
);

let (msg_tx, msg_rx) = aptos_channel::new::<AccountAddress, IncomingCommitRequest>(
QueueStyle::FIFO,
channel_size,
None,
);
let (msg_tx, msg_rx) = aptos_channel::new::<
AccountAddress,
(AccountAddress, IncomingCommitRequest),
>(QueueStyle::FIFO, channel_size, None);

let (result_tx, result_rx) = create_channel::<OrderedBlocks>();
let state_computer = Arc::new(EmptyStateComputer::new(result_tx));
Expand Down Expand Up @@ -185,7 +184,7 @@ pub fn prepare_buffer_manager(
pub fn launch_buffer_manager() -> (
Sender<OrderedBlocks>,
Sender<ResetRequest>,
aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
aptos_channels::UnboundedReceiver<Event<ConsensusMsg>>,
HashValue,
Runtime,
Expand Down Expand Up @@ -233,20 +232,20 @@ pub fn launch_buffer_manager() -> (

async fn loopback_commit_vote(
msg: Event<ConsensusMsg>,
msg_tx: &aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
msg_tx: &aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
verifier: &ValidatorVerifier,
) {
match msg {
Event::RpcRequest(author, msg, protocol, callback) => {
if let ConsensusMsg::CommitMessage(msg) = msg {
msg.verify(verifier).unwrap();
msg.verify(author, verifier).unwrap();
let request = IncomingCommitRequest {
req: *msg,
protocol,
response_sender: callback,
};
// verify the message and send the message into self loop
msg_tx.push(author, request).ok();
msg_tx.push(author, (author, request)).ok();
}
},
_ => {
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/quorum_store/network_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ impl NetworkListener {
counters::QUORUM_STORE_MSG_COUNT
.with_label_values(&["NetworkListener::batchmsg"])
.inc();
let author = batch_msg.author();
// Batch msg verify function alreay ensures that the batch_msg is not empty.
let author = batch_msg.author().expect("Empty batch message");
let batches = batch_msg.take();
counters::RECEIVED_BATCH_MSG_COUNT.inc();

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ impl BatchMsg {
Ok(epoch)
}

pub fn author(&self) -> PeerId {
self.batches[0].author()
pub fn author(&self) -> Option<PeerId> {
self.batches.first().map(|batch| batch.author())
}

pub fn take(self) -> Vec<Batch> {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl UnverifiedEvent {
//TODO: no need to sign and verify the proposal
UnverifiedEvent::ProposalMsg(p) => {
if !self_message {
p.verify(validator, proof_cache, quorum_store_enabled)?;
p.verify(peer_id, validator, proof_cache, quorum_store_enabled)?;
counters::VERIFY_MSG
.with_label_values(&["proposal"])
.observe(start_time.elapsed().as_secs_f64());
Expand All @@ -121,7 +121,7 @@ impl UnverifiedEvent {
},
UnverifiedEvent::VoteMsg(v) => {
if !self_message {
v.verify(validator)?;
v.verify(peer_id, validator)?;
counters::VERIFY_MSG
.with_label_values(&["vote"])
.observe(start_time.elapsed().as_secs_f64());
Expand All @@ -139,7 +139,7 @@ impl UnverifiedEvent {
},
UnverifiedEvent::OrderVoteMsg(v) => {
if !self_message {
v.verify_order_vote(validator)?;
v.verify_order_vote(peer_id, validator)?;
counters::VERIFY_MSG
.with_label_values(&["order_vote"])
.observe(start_time.elapsed().as_secs_f64());
Expand Down
Loading