From aca20b245f585c7337b70113595a8ddfc723f0d3 Mon Sep 17 00:00:00 2001 From: Alex Miao Date: Mon, 25 Sep 2023 15:07:03 -0700 Subject: [PATCH] refactor: clean up channel creation --- bin/rundler/src/cli/builder.rs | 4 +++- bin/rundler/src/cli/node/mod.rs | 7 +++++-- bin/rundler/src/cli/pool.rs | 19 ++++++++++++++++++- crates/builder/src/bundle_sender.rs | 2 +- crates/pool/src/task.rs | 4 +++- 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 21d0c22a8..a47bf012e 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -17,6 +17,8 @@ use tokio::sync::broadcast; use super::{json::get_json_config, CommonArgs}; +const REQUEST_CHANNEL_CAPACITY: usize = 1024; + /// CLI options for the builder #[derive(Args, Debug)] #[command(next_help_heading = "BUILDER")] @@ -262,7 +264,7 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho [BuilderTask::new( task_args, event_sender, - LocalBuilderBuilder::new(1024), + LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY), pool, ) .boxed()], diff --git a/bin/rundler/src/cli/node/mod.rs b/bin/rundler/src/cli/node/mod.rs index 8b755bfcc..c2e685d4d 100644 --- a/bin/rundler/src/cli/node/mod.rs +++ b/bin/rundler/src/cli/node/mod.rs @@ -15,6 +15,9 @@ use crate::cli::{ }; mod events; +const REQUEST_CHANNEL_CAPACITY: usize = 1024; +const BLOCK_CHANNEL_CAPACITY: usize = 1024; + #[derive(Debug, Args)] pub struct NodeCliArgs { #[command(flatten)] @@ -66,10 +69,10 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: } }); - let pool_builder = LocalPoolBuilder::new(1024, 1024); + let pool_builder = LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY); let pool_handle = pool_builder.get_handle(); - let builder_builder = LocalBuilderBuilder::new(1024); + let builder_builder = LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY); let builder_handle = builder_builder.get_handle(); spawn_tasks_with_shutdown( diff --git a/bin/rundler/src/cli/pool.rs b/bin/rundler/src/cli/pool.rs index 50dd88734..8ec7ad6ff 100644 --- a/bin/rundler/src/cli/pool.rs +++ b/bin/rundler/src/cli/pool.rs @@ -11,6 +11,10 @@ use tokio::sync::broadcast; use super::CommonArgs; use crate::cli::json::get_json_config; + +const REQUEST_CHANNEL_CAPACITY: usize = 1024; +const BLOCK_CHANNEL_CAPACITY: usize = 1024; + /// CLI options for the OP Pool #[derive(Args, Debug)] #[command(next_help_heading = "POOL")] @@ -77,6 +81,13 @@ pub struct PoolArgs { env = "POOL_CHAIN_HISTORY_SIZE" )] pub chain_history_size: Option, + + #[arg( + long = "pool.chain_update_channel_capacity", + name = "pool.chain_update_channel_capacity", + env = "POOL_CHAIN_UPDATE_CHANNEL_CAPACITY" + )] + pub chain_update_channel_capacity: Option, } impl PoolArgs { @@ -141,6 +152,7 @@ impl PoolArgs { http_poll_interval: Duration::from_millis(common.eth_poll_interval_millis), pool_configs, remote_address, + chain_update_channel_capacity: self.chain_update_channel_capacity.unwrap_or(1024), }) } } @@ -184,7 +196,12 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res emit::receive_and_log_events_with_filter(event_rx, |_| true); spawn_tasks_with_shutdown( - [PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()], + [PoolTask::new( + task_args, + event_sender, + LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY), + ) + .boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 8ec436bcf..9cb67e580 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -106,7 +106,7 @@ where }; // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. - // This task is used to consume the new heads and place them onto a channel that can be syncronously + // This task is used to consume the new heads and place them onto a channel that can be synchronously // consumed until the latest block is reached. let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { diff --git a/crates/pool/src/task.rs b/crates/pool/src/task.rs index 8319d9d56..a6711225e 100644 --- a/crates/pool/src/task.rs +++ b/crates/pool/src/task.rs @@ -39,6 +39,8 @@ pub struct Args { /// Address to bind the remote mempool server to, if any. /// If not provided, a server will not be started. pub remote_address: Option, + /// Channel capacity for the chain update channel. + pub chain_update_channel_capacity: usize, } /// Mempool task. @@ -69,7 +71,7 @@ impl Task for PoolTask { }; let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?; let chain = Chain::new(provider, chain_settings); - let (update_sender, _) = broadcast::channel(1000); + let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity); let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone()); let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?;