Skip to content

Commit

Permalink
Fixed some review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Nov 12, 2024
1 parent 0ffa2cf commit ba76a38
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 40 deletions.
4 changes: 2 additions & 2 deletions node/components/bft/src/chonky_bft/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
chonky_bft::{self, commit, new_view, proposal, timeout, StateMachine},
Config, PayloadManager,
};
use crate::{FromNetworkMessage, ToNetworkMessage};
use crate::{create_input_channel, FromNetworkMessage, ToNetworkMessage};
use assert_matches::assert_matches;
use std::sync::Arc;
use zksync_concurrency::sync::prunable_mpsc;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl UTHarness {
let setup = validator::testonly::Setup::new(rng, num_validators);
let store = TestMemoryStorage::new(ctx, &setup).await;
let (output_channel_send, output_channel_recv) = ctx::channel::unbounded();
let (input_channel_send, input_channel_recv) = Config::create_input_channel();
let (input_channel_send, input_channel_recv) = create_input_channel();
let (proposer_sender, proposer_receiver) = sync::watch::channel(None);

let cfg = Arc::new(Config {
Expand Down
54 changes: 25 additions & 29 deletions node/components/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,39 +101,35 @@ impl Config {
Err(ctx::Error::Internal(err)) => Err(err),
}
}
}

/// Creates a new input channel for the network messages.
pub fn create_input_channel() -> (
sync::prunable_mpsc::Sender<FromNetworkMessage>,
sync::prunable_mpsc::Receiver<FromNetworkMessage>,
) {
sync::prunable_mpsc::channel(
Self::inbound_filter_predicate,
Self::inbound_selection_function,
)
}
/// Creates a new input channel for the network messages.
pub fn create_input_channel() -> (
sync::prunable_mpsc::Sender<FromNetworkMessage>,
sync::prunable_mpsc::Receiver<FromNetworkMessage>,
) {
sync::prunable_mpsc::channel(inbound_filter_predicate, inbound_selection_function)
}

/// Filter predicate for incoming messages.
fn inbound_filter_predicate(new_req: &FromNetworkMessage) -> bool {
// Verify message signature
new_req.msg.verify().is_ok()
}
/// Filter predicate for incoming messages.
fn inbound_filter_predicate(new_req: &FromNetworkMessage) -> bool {
// Verify message signature
new_req.msg.verify().is_ok()
}

/// Selection function for incoming messages.
fn inbound_selection_function(
old_req: &FromNetworkMessage,
new_req: &FromNetworkMessage,
) -> SelectionFunctionResult {
if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label()
{
SelectionFunctionResult::Keep
/// Selection function for incoming messages.
fn inbound_selection_function(
old_req: &FromNetworkMessage,
new_req: &FromNetworkMessage,
) -> SelectionFunctionResult {
if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label() {
SelectionFunctionResult::Keep
} else {
// Discard older message
if old_req.msg.msg.view().number < new_req.msg.msg.view().number {
SelectionFunctionResult::DiscardOld
} else {
// Discard older message
if old_req.msg.msg.view().number < new_req.msg.msg.view().number {
SelectionFunctionResult::DiscardOld
} else {
SelectionFunctionResult::DiscardNew
}
SelectionFunctionResult::DiscardNew
}
}
}
6 changes: 3 additions & 3 deletions node/components/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> {
let (node, runner) = network::testonly::Instance::new_with_filters(
spec.net.clone(),
spec.block_store.clone(),
crate::Config::inbound_filter_predicate,
crate::Config::inbound_selection_function,
crate::inbound_filter_predicate,
crate::inbound_selection_function,
);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
nodes.push(node);
Expand Down Expand Up @@ -248,7 +248,7 @@ async fn run_nodes_twins(

for (i, spec) in specs.iter().enumerate() {
let (output_channel_send, output_channel_recv) = channel::unbounded();
let (input_channel_send, input_channel_recv) = crate::Config::create_input_channel();
let (input_channel_send, input_channel_recv) = crate::create_input_channel();

let validator_key = spec.net.validator_key.as_ref().unwrap().public();
let port = spec.net.server_addr.port();
Expand Down
2 changes: 1 addition & 1 deletion node/components/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Executor {
let network_config = self.network_config();

// Generate the communication channels. We have one for each component.
let (consensus_send, consensus_recv) = bft::Config::create_input_channel();
let (consensus_send, consensus_recv) = bft::create_input_channel();
let (network_send, network_recv) = ctx::channel::unbounded();

tracing::debug!("Starting components in separate threads.");
Expand Down
4 changes: 2 additions & 2 deletions node/components/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ async fn test_batch_votes_propagation() {
attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone()))
.into(),
},
|_| true,
|_, _| SelectionFunctionResult::Keep,
/*filter_predicate*/ |_| true,
/*selection_function*/ |_, _| SelectionFunctionResult::Keep,
);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
// Task going through the schedule, waiting for ANY node to collect the certificate
Expand Down
7 changes: 4 additions & 3 deletions node/components/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ pub(crate) async fn forward(

/// Node instance, wrapping the network component state and the
/// events channel.
#[allow(clippy::partial_pub_fields)]
pub struct Instance {
/// State of the instance.
pub net: Arc<Network>,
/// Termination signal that can be sent to the node.
pub terminate: channel::Sender<()>,
terminate: channel::Sender<()>,
/// Receiver channel to receive messages from the network component that are
/// intended for the consensus component.
pub consensus_receiver: sync::prunable_mpsc::Receiver<ConsensusReq>,
Expand Down Expand Up @@ -200,8 +201,8 @@ impl Instance {
block_store,
attestation: attestation::Controller::new(None).into(),
},
|_| true,
|_, _| SelectionFunctionResult::Keep,
/*filter_predicate*/ |_| true,
/*selection_function*/ |_, _| SelectionFunctionResult::Keep,
)
}

Expand Down

0 comments on commit ba76a38

Please sign in to comment.