diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 21d0c22a8..fd6312165 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -13,10 +13,12 @@ use rundler_task::{ spawn_tasks_with_shutdown, }; use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use super::{json::get_json_config, CommonArgs}; +const REQUEST_CAPACITY: usize = 1024; + /// CLI options for the builder #[derive(Args, Debug)] #[command(next_help_heading = "BUILDER")] @@ -258,11 +260,12 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho ) .await?; + let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY); spawn_tasks_with_shutdown( [BuilderTask::new( task_args, event_sender, - LocalBuilderBuilder::new(1024), + LocalBuilderBuilder::new(req_sender, req_receiver), pool, ) .boxed()], diff --git a/bin/rundler/src/cli/node/mod.rs b/bin/rundler/src/cli/node/mod.rs index 8b755bfcc..1367c891b 100644 --- a/bin/rundler/src/cli/node/mod.rs +++ b/bin/rundler/src/cli/node/mod.rs @@ -4,7 +4,7 @@ use rundler_pool::{LocalPoolBuilder, PoolEvent, PoolTask}; use rundler_rpc::RpcTask; use rundler_task::spawn_tasks_with_shutdown; use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use self::events::Event; use crate::cli::{ @@ -27,6 +27,9 @@ pub struct NodeCliArgs { rpc: RpcArgs, } +const REQUEST_CAPACITY: usize = 1024; +const BLOCK_CAPACITY: usize = 1024; + pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { let NodeCliArgs { pool: pool_args, @@ -66,10 +69,14 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: } }); - let pool_builder = LocalPoolBuilder::new(1024, 1024); + let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY); + let (block_sender, _) = broadcast::channel(BLOCK_CAPACITY); + + let pool_builder = LocalPoolBuilder::new(req_sender, req_receiver, block_sender); let pool_handle = pool_builder.get_handle(); - let builder_builder = LocalBuilderBuilder::new(1024); + let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY); + let builder_builder = LocalBuilderBuilder::new(req_sender, req_receiver); let builder_handle = builder_builder.get_handle(); spawn_tasks_with_shutdown( diff --git a/bin/rundler/src/cli/pool.rs b/bin/rundler/src/cli/pool.rs index 50dd88734..aba3d8652 100644 --- a/bin/rundler/src/cli/pool.rs +++ b/bin/rundler/src/cli/pool.rs @@ -7,7 +7,7 @@ use rundler_pool::{LocalPoolBuilder, PoolConfig, PoolTask, PoolTaskArgs}; use rundler_sim::MempoolConfig; use rundler_task::spawn_tasks_with_shutdown; use rundler_utils::emit::{self, EVENT_CHANNEL_CAPACITY}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use super::CommonArgs; use crate::cli::json::get_json_config; @@ -171,6 +171,9 @@ pub struct PoolCliArgs { pool: PoolArgs, } +const REQUEST_CAPACITY: usize = 1024; +const BLOCK_CAPACITY: usize = 1024; + pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { let PoolCliArgs { pool: pool_args } = pool_args; let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); @@ -183,8 +186,16 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res emit::receive_and_log_events_with_filter(event_rx, |_| true); + let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY); + let (block_sender, _) = broadcast::channel(BLOCK_CAPACITY); + spawn_tasks_with_shutdown( - [PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()], + [PoolTask::new( + task_args, + event_sender, + LocalPoolBuilder::new(req_sender, req_receiver, block_sender), + ) + .boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index b803d0be0..006eed16b 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -106,7 +106,7 @@ where }; // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. - // This task is used to consume the new heads and place them onto a channel that can be syncronously + // This task is used to consume the new heads and place them onto a channel that can be synchronously // consumed until the latest block is reached. let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { diff --git a/crates/builder/src/server/local.rs b/crates/builder/src/server/local.rs index a75ab92bd..a81d7f350 100644 --- a/crates/builder/src/server/local.rs +++ b/crates/builder/src/server/local.rs @@ -26,8 +26,10 @@ pub struct LocalBuilderBuilder { impl LocalBuilderBuilder { /// Create a new local builder server builder - pub fn new(request_capcity: usize) -> Self { - let (req_sender, req_receiver) = mpsc::channel(request_capcity); + pub fn new( + req_sender: mpsc::Sender, + req_receiver: mpsc::Receiver, + ) -> Self { Self { req_sender, req_receiver, @@ -221,7 +223,7 @@ enum ServerRequestKind { } #[derive(Debug)] -struct ServerRequest { +pub struct ServerRequest { request: ServerRequestKind, response: oneshot::Sender>, } diff --git a/crates/pool/src/server/local.rs b/crates/pool/src/server/local.rs index 71d653e6a..f5a9c0a71 100644 --- a/crates/pool/src/server/local.rs +++ b/crates/pool/src/server/local.rs @@ -30,9 +30,11 @@ pub struct LocalPoolBuilder { impl LocalPoolBuilder { /// Create a new local pool server builder - pub fn new(request_capacity: usize, block_capacity: usize) -> Self { - let (req_sender, req_receiver) = mpsc::channel(request_capacity); - let (block_sender, _) = broadcast::channel(block_capacity); + pub fn new( + req_sender: mpsc::Sender, + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + ) -> Self { Self { req_sender, req_receiver, @@ -434,7 +436,7 @@ where } #[derive(Debug)] -struct ServerRequest { +pub struct ServerRequest { request: ServerRequestKind, response: oneshot::Sender>, } @@ -614,7 +616,9 @@ mod tests { } fn setup(pools: HashMap>) -> State { - let builder = LocalPoolBuilder::new(10, 10); + let (req_sender, req_receiver) = mpsc::channel(1024); + let (block_sender, _) = broadcast::channel(1024); + let builder = LocalPoolBuilder::new(req_sender, req_receiver, block_sender); let handle = builder.get_handle(); let (tx, rx) = broadcast::channel(10); let run_handle = builder.run(pools, rx, CancellationToken::new()); diff --git a/crates/pool/src/task.rs b/crates/pool/src/task.rs index 8319d9d56..723fd1c34 100644 --- a/crates/pool/src/task.rs +++ b/crates/pool/src/task.rs @@ -23,6 +23,8 @@ use crate::{ server::{spawn_remote_mempool_server, LocalPoolBuilder}, }; +const CHAIN_UPDATE_CHANNEL_CAPACITY: usize = 1000; + /// Arguments for the pool task. #[derive(Debug)] pub struct Args { @@ -69,7 +71,7 @@ impl Task for PoolTask { }; let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?; let chain = Chain::new(provider, chain_settings); - let (update_sender, _) = broadcast::channel(1000); + let (update_sender, _) = broadcast::channel(CHAIN_UPDATE_CHANNEL_CAPACITY); let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone()); let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?;