diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index b803d0be0..8ec436bcf 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -30,7 +30,7 @@ use crate::{ #[async_trait] pub(crate) trait BundleSender: Send + Sync + 'static { - async fn send_bundles_in_loop(&mut self); + async fn send_bundles_in_loop(self) -> anyhow::Result<()>; } #[derive(Debug)] @@ -99,10 +99,10 @@ where /// Loops forever, attempting to form and send a bundle on each new block, /// then waiting for one bundle to be mined or dropped before forming the /// next one. - async fn send_bundles_in_loop(&mut self) { + async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> { let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else { error!("Failed to subscribe to new blocks"); - return; + bail!("failed to subscribe to new blocks"); }; // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. @@ -147,7 +147,7 @@ where Some(b) => b, None => { error!("Block stream closed"); - return; + bail!("Block stream closed"); } }; // Consume any other blocks that may have been buffered up @@ -161,7 +161,7 @@ where } Err(mpsc::error::TryRecvError::Disconnected) => { error!("Block stream closed"); - return; + bail!("Block stream closed"); } } } diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index 71d83583b..948aba52d 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -12,20 +12,19 @@ use ethers::{ types::{Address, H256}, }; use ethers_signers::Signer; +use futures::future; +use futures_util::TryFutureExt; use rundler_pool::PoolServer; use rundler_sim::{ MempoolConfig, PriorityFeeMode, SimulateValidationTracerImpl, SimulationSettings, SimulatorImpl, }; use rundler_task::Task; use rundler_types::contracts::i_entry_point::IEntryPoint; -use rundler_utils::{ - emit::WithEntryPoint, - eth, - handle::{self, SpawnGuard}, -}; +use rundler_utils::{emit::WithEntryPoint, eth, handle}; use rusoto_core::Region; use tokio::{ sync::{broadcast, mpsc}, + task::JoinHandle, time, try_join, }; use tokio_util::sync::CancellationToken; @@ -123,7 +122,7 @@ where let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; let manual_bundling_mode = Arc::new(AtomicBool::new(false)); - let mut spawn_guards = vec![]; + let mut sender_handles = vec![]; let mut send_bundle_txs = vec![]; for i in 0..self.args.num_bundle_builders { let (spawn_guard, send_bundle_tx) = self @@ -133,9 +132,15 @@ where Arc::clone(&provider), ) .await?; - spawn_guards.push(spawn_guard); + sender_handles.push(spawn_guard); send_bundle_txs.push(send_bundle_tx); } + // flatten the senders handles to one handle, short-circuit on errors + let sender_handle = tokio::spawn( + future::try_join_all(sender_handles) + .map_ok(|_| ()) + .map_err(|e| anyhow::anyhow!(e)), + ); let builder_handle = self.builder_builder.get_handle(); let builder_runnder_handle = self.builder_builder.run( @@ -161,6 +166,7 @@ where info!("Started bundle builder"); match try_join!( + handle::flatten_handle(sender_handle), handle::flatten_handle(builder_runnder_handle), handle::flatten_handle(remote_handle), ) { @@ -205,7 +211,10 @@ where index: u64, manual_bundling_mode: Arc, provider: Arc>, - ) -> anyhow::Result<(SpawnGuard, mpsc::Sender)> { + ) -> anyhow::Result<( + JoinHandle>, + mpsc::Sender, + )> { let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); let signer = if let Some(pk) = &self.args.private_key { @@ -297,7 +306,7 @@ where proposer_settings, self.event_sender.clone(), ); - let mut builder = BundleSenderImpl::new( + let builder = BundleSenderImpl::new( index, manual_bundling_mode.clone(), send_bundle_rx, @@ -312,9 +321,7 @@ where self.event_sender.clone(), ); - Ok(( - SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }), - send_bundle_tx, - )) + // Spawn each sender as its own independent task + Ok((tokio::spawn(builder.send_bundles_in_loop()), send_bundle_tx)) } }