Skip to content

Commit

Permalink
refactor: clean up channel creation
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-miao committed Sep 26, 2023
1 parent f7e6f1c commit aca20b2
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 6 deletions.
4 changes: 3 additions & 1 deletion bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use tokio::sync::broadcast;

use super::{json::get_json_config, CommonArgs};

const REQUEST_CHANNEL_CAPACITY: usize = 1024;

/// CLI options for the builder
#[derive(Args, Debug)]
#[command(next_help_heading = "BUILDER")]
Expand Down Expand Up @@ -262,7 +264,7 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
[BuilderTask::new(
task_args,
event_sender,
LocalBuilderBuilder::new(1024),
LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY),
pool,
)
.boxed()],
Expand Down
7 changes: 5 additions & 2 deletions bin/rundler/src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::cli::{
};
mod events;

const REQUEST_CHANNEL_CAPACITY: usize = 1024;
const BLOCK_CHANNEL_CAPACITY: usize = 1024;

#[derive(Debug, Args)]
pub struct NodeCliArgs {
#[command(flatten)]
Expand Down Expand Up @@ -66,10 +69,10 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
}
});

let pool_builder = LocalPoolBuilder::new(1024, 1024);
let pool_builder = LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY);
let pool_handle = pool_builder.get_handle();

let builder_builder = LocalBuilderBuilder::new(1024);
let builder_builder = LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY);
let builder_handle = builder_builder.get_handle();

spawn_tasks_with_shutdown(
Expand Down
19 changes: 18 additions & 1 deletion bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use tokio::sync::broadcast;

use super::CommonArgs;
use crate::cli::json::get_json_config;

const REQUEST_CHANNEL_CAPACITY: usize = 1024;
const BLOCK_CHANNEL_CAPACITY: usize = 1024;

/// CLI options for the OP Pool
#[derive(Args, Debug)]
#[command(next_help_heading = "POOL")]
Expand Down Expand Up @@ -77,6 +81,13 @@ pub struct PoolArgs {
env = "POOL_CHAIN_HISTORY_SIZE"
)]
pub chain_history_size: Option<u64>,

#[arg(
long = "pool.chain_update_channel_capacity",
name = "pool.chain_update_channel_capacity",
env = "POOL_CHAIN_UPDATE_CHANNEL_CAPACITY"
)]
pub chain_update_channel_capacity: Option<usize>,
}

impl PoolArgs {
Expand Down Expand Up @@ -141,6 +152,7 @@ impl PoolArgs {
http_poll_interval: Duration::from_millis(common.eth_poll_interval_millis),
pool_configs,
remote_address,
chain_update_channel_capacity: self.chain_update_channel_capacity.unwrap_or(1024),
})
}
}
Expand Down Expand Up @@ -184,7 +196,12 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res
emit::receive_and_log_events_with_filter(event_rx, |_| true);

spawn_tasks_with_shutdown(
[PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()],
[PoolTask::new(
task_args,
event_sender,
LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY),
)
.boxed()],
tokio::signal::ctrl_c(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// This task is used to consume the new heads and place them onto a channel that can be synchronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
Expand Down
4 changes: 3 additions & 1 deletion crates/pool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Args {
/// Address to bind the remote mempool server to, if any.
/// If not provided, a server will not be started.
pub remote_address: Option<SocketAddr>,
/// Channel capacity for the chain update channel.
pub chain_update_channel_capacity: usize,
}

/// Mempool task.
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Task for PoolTask {
};
let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?;
let chain = Chain::new(provider, chain_settings);
let (update_sender, _) = broadcast::channel(1000);
let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity);
let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone());

let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?;
Expand Down

0 comments on commit aca20b2

Please sign in to comment.