From b2b21083d3b521c09184eb9b380d9f190ea0198f Mon Sep 17 00:00:00 2001 From: dancoombs Date: Fri, 28 Jul 2023 13:39:00 -0500 Subject: [PATCH] wip --- proto/builder.proto | 33 ++++- src/builder/mod.rs | 14 +++ src/builder/server.rs | 95 --------------- src/builder/server/local/client.rs | 67 ++++++++++ src/builder/server/local/mod.rs | 2 + src/builder/server/local/server.rs | 145 ++++++++++++++++++++++ src/builder/server/mod.rs | 35 ++++++ src/builder/server/remote/client.rs | 113 +++++++++++++++++ src/builder/server/remote/error.rs | 28 +++++ src/builder/server/remote/mod.rs | 4 + src/builder/server/remote/protos.rs | 27 +++++ src/builder/server/remote/server.rs | 181 ++++++++++++++++++++++++++++ src/builder/task.rs | 72 ++++++----- src/cli/builder.rs | 23 ++-- src/cli/node.rs | 30 ++--- src/cli/rpc.rs | 14 +-- src/common/grpc/mocks.rs | 137 --------------------- src/common/grpc/mod.rs | 2 - src/common/protos.rs | 44 +------ src/common/types/mod.rs | 8 -- src/op_pool/server/local/server.rs | 9 +- src/op_pool/server/mod.rs | 2 - src/op_pool/server/remote/client.rs | 2 +- src/op_pool/server/remote/protos.rs | 4 +- src/rpc/debug.rs | 36 ++---- src/rpc/health.rs | 17 ++- src/rpc/task.rs | 96 ++++++++------- 27 files changed, 814 insertions(+), 426 deletions(-) delete mode 100644 src/builder/server.rs create mode 100644 src/builder/server/local/client.rs create mode 100644 src/builder/server/local/mod.rs create mode 100644 src/builder/server/local/server.rs create mode 100644 src/builder/server/mod.rs create mode 100644 src/builder/server/remote/client.rs create mode 100644 src/builder/server/remote/error.rs create mode 100644 src/builder/server/remote/mod.rs create mode 100644 src/builder/server/remote/protos.rs create mode 100644 src/builder/server/remote/server.rs delete mode 100644 src/common/grpc/mocks.rs diff --git a/proto/builder.proto b/proto/builder.proto index 947228260..9bcdd9eea 100644 --- a/proto/builder.proto +++ b/proto/builder.proto @@ -8,13 +8,32 @@ enum BundlingMode { } service Builder { + rpc GetSupportedEntryPoints (GetSupportedEntryPointsRequest) returns (GetSupportedEntryPointsResponse); rpc DebugSendBundleNow(DebugSendBundleNowRequest) returns (DebugSendBundleNowResponse); rpc DebugSetBundlingMode(DebugSetBundlingModeRequest) returns (DebugSetBundlingModeResponse); } +message GetSupportedEntryPointsRequest {} +message GetSupportedEntryPointsResponse { + oneof result { + GetSupportedEntryPointsSuccess success = 1; + BuilderError failure = 2; + } +} +message GetSupportedEntryPointsSuccess { + uint64 chain_id = 1; + repeated bytes entry_points = 2; +} + message DebugSendBundleNowRequest {} message DebugSendBundleNowResponse { + oneof result { + DebugSendBundleNowSuccess success = 1; + BuilderError failure = 2; + } +} +message DebugSendBundleNowSuccess { bytes transaction_hash = 1; } @@ -22,4 +41,16 @@ message DebugSetBundlingModeRequest { BundlingMode mode = 1; } -message DebugSetBundlingModeResponse {} \ No newline at end of file +message DebugSetBundlingModeResponse { + oneof result { + DebugSetBundlingModeSuccess success = 1; + BuilderError failure = 2; + } +} +message DebugSetBundlingModeSuccess {} + +message BuilderError { + oneof error { + string internal = 1; + } +} diff --git a/src/builder/mod.rs b/src/builder/mod.rs index 530089088..5b8ff32e4 100644 --- a/src/builder/mod.rs +++ b/src/builder/mod.rs @@ -6,4 +6,18 @@ mod signer; mod task; mod transaction_tracker; +use parse_display::Display; +use serde::{Deserialize, Serialize}; +pub use server::{ + connect_remote_builder_client, BuilderClient, LocalBuilderClient, LocalBuilderServerRequest, +}; +use strum::EnumIter; pub use task::*; + +#[derive(Display, Debug, Clone, Copy, Eq, PartialEq, EnumIter, Serialize, Deserialize)] +#[display(style = "lowercase")] +#[serde(rename_all = "lowercase")] +pub enum BundlingMode { + Manual, + Auto, +} diff --git a/src/builder/server.rs b/src/builder/server.rs deleted file mode 100644 index a590b8257..000000000 --- a/src/builder/server.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use tokio::sync::{mpsc, oneshot}; -use tonic::{async_trait, Request, Response, Status}; -use tracing::debug; - -use super::bundle_sender::SendBundleRequest; -use crate::{ - builder::bundle_sender::SendBundleResult, - common::protos::builder::{ - builder_server::Builder, BundlingMode, DebugSendBundleNowRequest, - DebugSendBundleNowResponse, DebugSetBundlingModeRequest, DebugSetBundlingModeResponse, - }, -}; - -#[derive(Debug)] -pub struct BuilderImpl { - send_bundle_requester: mpsc::Sender, - manual_bundling_mode: Arc, -} - -impl BuilderImpl { - pub fn new( - manual_bundling_mode: Arc, - send_bundle_requester: mpsc::Sender, - ) -> Self { - Self { - manual_bundling_mode, - send_bundle_requester, - } - } -} - -#[async_trait] -impl Builder for BuilderImpl { - async fn debug_send_bundle_now( - &self, - _request: Request, - ) -> tonic::Result> { - debug!("Send bundle now called"); - if !self.manual_bundling_mode.load(Ordering::Relaxed) { - return Err(Status::failed_precondition( - "manual bundling mode must be enabled", - )); - } - - let (tx, rx) = oneshot::channel(); - - self.send_bundle_requester - .send(SendBundleRequest { responder: tx }) - .await - .map_err(|e| Status::internal(format!("failed to send bundle request {e}")))?; - - let result = rx - .await - .map_err(|e| Status::internal(format!("failed to receive bundle result {e}")))?; - - let tx_hash = match result { - SendBundleResult::Success { tx_hash, .. } => tx_hash, - SendBundleResult::NoOperationsInitially => { - return Err(Status::internal("no ops to send")) - } - SendBundleResult::NoOperationsAfterFeeIncreases { .. } => { - return Err(Status::internal( - "bundle initially had operations, but after increasing gas fees it was empty", - )) - } - SendBundleResult::StalledAtMaxFeeIncreases => return Err(Status::internal("")), - SendBundleResult::Error(error) => return Err(Status::internal(error.to_string())), - }; - Ok(Response::new(DebugSendBundleNowResponse { - transaction_hash: tx_hash.as_bytes().to_vec(), - })) - } - - async fn debug_set_bundling_mode( - &self, - request: Request, - ) -> tonic::Result> { - let mode = BundlingMode::from_i32(request.into_inner().mode).unwrap_or_default(); - let is_manual_bundling = match mode { - BundlingMode::Unspecified => { - return Err(Status::invalid_argument("invalid bundling mode")) - } - BundlingMode::Manual => true, - BundlingMode::Auto => false, - }; - self.manual_bundling_mode - .store(is_manual_bundling, Ordering::Relaxed); - Ok(Response::new(DebugSetBundlingModeResponse {})) - } -} diff --git a/src/builder/server/local/client.rs b/src/builder/server/local/client.rs new file mode 100644 index 000000000..dbeeb4109 --- /dev/null +++ b/src/builder/server/local/client.rs @@ -0,0 +1,67 @@ +use ethers::types::{Address, H256}; +use tokio::sync::{mpsc, oneshot}; +use tonic::async_trait; + +use super::server::{ServerRequest, ServerRequestKind, ServerResponse}; +use crate::builder::{ + server::{BuilderClient, BuilderResult, BuilderServerError}, + BundlingMode, +}; + +#[derive(Debug, Clone)] +pub struct LocalBuilderClient { + sender: mpsc::Sender, +} + +impl LocalBuilderClient { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send(&self, request: ServerRequestKind) -> BuilderResult { + let (send, recv) = oneshot::channel(); + self.sender + .send(ServerRequest { + request, + response: send, + }) + .await + .map_err(|e| anyhow::anyhow!("LocalBuilderServer closed {e:?}"))?; + recv.await + .map_err(|e| anyhow::anyhow!("LocalBuilderServer closed {e:?}"))? + } +} + +#[async_trait] +impl BuilderClient for LocalBuilderClient { + async fn get_supported_entry_points(&self) -> BuilderResult> { + let resp = self + .send(ServerRequestKind::GetSupportedEntryPoints) + .await?; + match resp { + ServerResponse::GetSupportedEntryPoints { + entry_points, + chain_id, + } => Ok(entry_points), + _ => Err(BuilderServerError::UnexpectedResponse), + } + } + + async fn debug_send_bundle_now(&self) -> BuilderResult { + let resp = self.send(ServerRequestKind::DebugSendBundleNow).await?; + match resp { + ServerResponse::DebugSendBundleNow { hash } => Ok(hash), + _ => Err(BuilderServerError::UnexpectedResponse), + } + } + + async fn debug_set_bundling_mode(&self, mode: BundlingMode) -> BuilderResult<()> { + let resp = self + .send(ServerRequestKind::DebugSetBundlingMode { mode }) + .await?; + match resp { + ServerResponse::DebugSetBundlingMode => Ok(()), + _ => Err(BuilderServerError::UnexpectedResponse), + } + } +} diff --git a/src/builder/server/local/mod.rs b/src/builder/server/local/mod.rs new file mode 100644 index 000000000..c07f47e0f --- /dev/null +++ b/src/builder/server/local/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod server; diff --git a/src/builder/server/local/server.rs b/src/builder/server/local/server.rs new file mode 100644 index 000000000..d082b5675 --- /dev/null +++ b/src/builder/server/local/server.rs @@ -0,0 +1,145 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use ethers::types::{Address, H256}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; + +use crate::builder::{ + bundle_sender::{SendBundleRequest, SendBundleResult}, + server::BuilderResult, + BundlingMode, +}; + +pub async fn spawn_local_builder_server( + req_receiver: mpsc::Receiver, + manual_bundling_mode: Arc, + send_bundle_requester: mpsc::Sender, + entry_points: Vec
, + chain_id: u64, + shutdown_token: CancellationToken, +) -> anyhow::Result>> { + let mut server = LocalBuilderServer::new( + req_receiver, + manual_bundling_mode, + send_bundle_requester, + entry_points, + chain_id, + ); + let handle = tokio::spawn(async move { server.run(shutdown_token).await }); + Ok(handle) +} + +#[derive(Debug)] +pub struct ServerRequest { + pub request: ServerRequestKind, + pub response: oneshot::Sender>, +} + +#[derive(Clone, Debug)] +pub enum ServerRequestKind { + GetSupportedEntryPoints, + DebugSendBundleNow, + DebugSetBundlingMode { mode: BundlingMode }, +} + +#[derive(Clone, Debug)] +pub enum ServerResponse { + GetSupportedEntryPoints { + entry_points: Vec
, + chain_id: u64, + }, + DebugSendBundleNow { + hash: H256, + }, + DebugSetBundlingMode, +} + +pub struct LocalBuilderServer { + req_receiver: mpsc::Receiver, + send_bundle_requester: mpsc::Sender, + manual_bundling_mode: Arc, + entry_points: Vec
, + chain_id: u64, +} + +impl LocalBuilderServer { + pub fn new( + req_receiver: mpsc::Receiver, + manual_bundling_mode: Arc, + send_bundle_requester: mpsc::Sender, + entry_points: Vec
, + chain_id: u64, + ) -> Self { + Self { + req_receiver, + manual_bundling_mode, + send_bundle_requester, + entry_points, + chain_id, + } + } + + pub async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + return Ok(()) + } + Some(req) = self.req_receiver.recv() => { + let resp: BuilderResult = 'a: { + match req.request { + ServerRequestKind::GetSupportedEntryPoints => { + Ok(ServerResponse::GetSupportedEntryPoints { + entry_points: self.entry_points.clone(), + chain_id: self.chain_id, + }) + }, + ServerRequestKind::DebugSendBundleNow => { + let (tx, rx) = oneshot::channel(); + match self.send_bundle_requester.send(SendBundleRequest{ + responder: tx + }).await { + Ok(()) => {}, + Err(e) => break 'a Err(anyhow::anyhow!("failed to send send bundle request: {}", e.to_string()).into()) + } + + let result = match rx.await { + Ok(result) => result, + Err(e) => break 'a Err(anyhow::anyhow!("failed to receive bundle result: {e:?}").into()) + }; + + match result { + SendBundleResult::Success { tx_hash, .. } => { + Ok(ServerResponse::DebugSendBundleNow { hash: tx_hash }) + }, + SendBundleResult::NoOperationsInitially => { + Err(anyhow::anyhow!("no ops to send").into()) + }, + SendBundleResult::NoOperationsAfterFeeIncreases { .. } => { + Err(anyhow::anyhow!("bundle initially had operations, but after increasing gas fees it was empty").into()) + }, + SendBundleResult::StalledAtMaxFeeIncreases => Err(anyhow::anyhow!("stalled at max fee increases").into()), + SendBundleResult::Error(e) => Err(anyhow::anyhow!("send bundle error: {e:?}").into()), + } + }, + ServerRequestKind::DebugSetBundlingMode { mode } => { + self.manual_bundling_mode.store(mode == BundlingMode::Manual, Ordering::Relaxed); + Ok(ServerResponse::DebugSetBundlingMode) + }, + } + }; + + if let Err(e) = req.response.send(resp) { + tracing::error!("failed to send response: {:?}", e); + } + } + } + } + } +} diff --git a/src/builder/server/mod.rs b/src/builder/server/mod.rs new file mode 100644 index 000000000..1c868b773 --- /dev/null +++ b/src/builder/server/mod.rs @@ -0,0 +1,35 @@ +mod local; +mod remote; + +use ethers::types::{Address, H256}; +pub use local::{ + client::LocalBuilderClient, + server::{spawn_local_builder_server, ServerRequest as LocalBuilderServerRequest}, +}; +#[cfg(test)] +use mockall::automock; +pub use remote::{client::connect_remote_builder_client, server::spawn_remote_builder_server}; +use tonic::async_trait; + +use super::BundlingMode; + +#[derive(Debug, thiserror::Error)] +pub enum BuilderServerError { + #[error("Unexpected response from BuilderServer")] + UnexpectedResponse, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub type Error = BuilderServerError; +pub type BuilderResult = std::result::Result; + +#[cfg_attr(test, automock)] +#[async_trait] +pub trait BuilderClient: Send + Sync + 'static { + async fn get_supported_entry_points(&self) -> BuilderResult>; + + async fn debug_send_bundle_now(&self) -> BuilderResult; + + async fn debug_set_bundling_mode(&self, mode: BundlingMode) -> BuilderResult<()>; +} diff --git a/src/builder/server/remote/client.rs b/src/builder/server/remote/client.rs new file mode 100644 index 000000000..7109cde26 --- /dev/null +++ b/src/builder/server/remote/client.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use anyhow::bail; +use ethers::types::{Address, H256}; +use tokio_util::sync::CancellationToken; +use tonic::{async_trait, transport::Channel}; + +use super::protos::{ + builder_client::BuilderClient as GrpcBuilderClient, debug_send_bundle_now_response, + debug_set_bundling_mode_response, get_supported_entry_points_response, + BundlingMode as ProtoBundlingMode, DebugSendBundleNowRequest, DebugSetBundlingModeRequest, + GetSupportedEntryPointsRequest, +}; +use crate::{ + builder::{ + server::{BuilderClient, BuilderResult, BuilderServerError}, + BundlingMode, + }, + common::{ + protos::{from_bytes, ConversionError}, + server::connect_with_retries, + }, +}; + +pub struct RemoteBuilderClient { + grpc_client: GrpcBuilderClient, +} + +impl RemoteBuilderClient { + pub fn new(grpc_client: GrpcBuilderClient) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl BuilderClient for Arc { + async fn get_supported_entry_points(&self) -> BuilderResult> { + let res = self + .grpc_client + .clone() + .get_supported_entry_points(GetSupportedEntryPointsRequest {}) + .await? + .into_inner() + .result; + + match res { + Some(get_supported_entry_points_response::Result::Success(s)) => Ok(s + .entry_points + .into_iter() + .map(|ep| from_bytes(ep.as_slice())) + .collect::>()?), + Some(get_supported_entry_points_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(BuilderServerError::Other(anyhow::anyhow!( + "should have received result from builder" + )))?, + } + } + + async fn debug_send_bundle_now(&self) -> BuilderResult { + let res = self + .grpc_client + .clone() + .debug_send_bundle_now(DebugSendBundleNowRequest {}) + .await? + .into_inner() + .result; + + match res { + Some(debug_send_bundle_now_response::Result::Success(s)) => { + Ok(H256::from_slice(&s.transaction_hash)) + } + Some(debug_send_bundle_now_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(BuilderServerError::Other(anyhow::anyhow!( + "should have received result from builder" + )))?, + } + } + + async fn debug_set_bundling_mode(&self, mode: BundlingMode) -> BuilderResult<()> { + let res = self + .grpc_client + .clone() + .debug_set_bundling_mode(DebugSetBundlingModeRequest { + mode: ProtoBundlingMode::from(mode) as i32, + }) + .await? + .into_inner() + .result; + + match res { + Some(debug_set_bundling_mode_response::Result::Success(_)) => Ok(()), + Some(debug_set_bundling_mode_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(BuilderServerError::Other(anyhow::anyhow!( + "should have received result from builder" + )))?, + } + } +} + +pub async fn connect_remote_builder_client( + builder_url: &str, + shutdown_token: CancellationToken, +) -> anyhow::Result> { + tokio::select! { + _ = shutdown_token.cancelled() => { + tracing::error!("bailing from connecting client, server shutting down"); + bail!("Server shutting down") + } + res = connect_with_retries("builder", builder_url, GrpcBuilderClient::connect) => { + Ok(Arc::new(RemoteBuilderClient::new(res?))) + } + } +} diff --git a/src/builder/server/remote/error.rs b/src/builder/server/remote/error.rs new file mode 100644 index 000000000..1bec26cbf --- /dev/null +++ b/src/builder/server/remote/error.rs @@ -0,0 +1,28 @@ +use super::protos::BuilderError as ProtoBuilderError; +use crate::{builder::server::BuilderServerError, common::protos::ConversionError}; + +impl From for BuilderServerError { + fn from(value: tonic::Status) -> Self { + BuilderServerError::Other(anyhow::anyhow!(value.to_string())) + } +} + +impl From for BuilderServerError { + fn from(value: ConversionError) -> Self { + BuilderServerError::Other(anyhow::anyhow!(value.to_string())) + } +} + +impl TryFrom for BuilderServerError { + type Error = anyhow::Error; + + fn try_from(value: ProtoBuilderError) -> Result { + todo!() + } +} + +impl From for ProtoBuilderError { + fn from(value: BuilderServerError) -> Self { + todo!() + } +} diff --git a/src/builder/server/remote/mod.rs b/src/builder/server/remote/mod.rs new file mode 100644 index 000000000..6fbf830f3 --- /dev/null +++ b/src/builder/server/remote/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +mod error; +pub mod protos; +pub mod server; diff --git a/src/builder/server/remote/protos.rs b/src/builder/server/remote/protos.rs new file mode 100644 index 000000000..5736e21e3 --- /dev/null +++ b/src/builder/server/remote/protos.rs @@ -0,0 +1,27 @@ +use crate::{builder::BundlingMode as RpcBundlingMode, common::protos::ConversionError}; + +tonic::include_proto!("builder"); + +pub const BUILDER_FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("builder_descriptor"); + +impl From for BundlingMode { + fn from(mode: RpcBundlingMode) -> Self { + match mode { + RpcBundlingMode::Auto => BundlingMode::Auto, + RpcBundlingMode::Manual => BundlingMode::Manual, + } + } +} + +impl TryFrom for RpcBundlingMode { + type Error = ConversionError; + + fn try_from(value: BundlingMode) -> Result { + match value { + BundlingMode::Auto => Ok(RpcBundlingMode::Auto), + BundlingMode::Manual => Ok(RpcBundlingMode::Manual), + _ => Err(ConversionError::InvalidEnumValue(value as i32)), + } + } +} diff --git a/src/builder/server/remote/server.rs b/src/builder/server/remote/server.rs new file mode 100644 index 000000000..e9cca8ad1 --- /dev/null +++ b/src/builder/server/remote/server.rs @@ -0,0 +1,181 @@ +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use ethers::types::Address; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tonic::{async_trait, transport::Server, Request, Response, Status}; +use tracing::debug; + +use super::protos::{ + builder_server::{Builder, BuilderServer}, + debug_set_bundling_mode_response, get_supported_entry_points_response, BundlingMode, + DebugSendBundleNowRequest, DebugSendBundleNowResponse, DebugSetBundlingModeRequest, + DebugSetBundlingModeResponse, DebugSetBundlingModeSuccess, GetSupportedEntryPointsRequest, + GetSupportedEntryPointsResponse, GetSupportedEntryPointsSuccess, BUILDER_FILE_DESCRIPTOR_SET, +}; +use crate::builder::{ + bundle_sender::{SendBundleRequest, SendBundleResult}, + server::remote::protos::{debug_send_bundle_now_response, DebugSendBundleNowSuccess}, +}; + +pub async fn spawn_remote_builder_server( + addr: SocketAddr, + manual_bundling_mode: Arc, + send_bundle_requester: mpsc::Sender, + entry_points: Vec
, + chain_id: u64, + shutdown_token: CancellationToken, +) -> anyhow::Result>> { + // gRPC server + let builder_server = BuilderImpl::new( + manual_bundling_mode, + send_bundle_requester, + entry_points, + chain_id, + ); + let builder_server = BuilderServer::new(builder_server); + + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(BUILDER_FILE_DESCRIPTOR_SET) + .build()?; + + // health service + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_serving::>() + .await; + + Ok(tokio::spawn(async move { + Server::builder() + .add_service(builder_server) + .add_service(reflection_service) + .add_service(health_service) + .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) + .await + .map_err(|e| anyhow::anyhow!(format!("builder server failed: {e:?}"))) + })) +} + +#[derive(Debug)] +pub struct BuilderImpl { + send_bundle_requester: mpsc::Sender, + manual_bundling_mode: Arc, + entry_points: Vec
, + chain_id: u64, +} + +impl BuilderImpl { + pub fn new( + manual_bundling_mode: Arc, + send_bundle_requester: mpsc::Sender, + entry_points: Vec
, + chain_id: u64, + ) -> Self { + Self { + manual_bundling_mode, + send_bundle_requester, + entry_points, + chain_id, + } + } +} + +#[async_trait] +impl Builder for BuilderImpl { + async fn get_supported_entry_points( + &self, + _request: Request, + ) -> tonic::Result> { + Ok(Response::new(GetSupportedEntryPointsResponse { + result: Some(get_supported_entry_points_response::Result::Success( + GetSupportedEntryPointsSuccess { + entry_points: self + .entry_points + .clone() + .into_iter() + .map(|a| a.as_bytes().to_vec()) + .collect(), + chain_id: self.chain_id, + }, + )), + })) + } + + async fn debug_send_bundle_now( + &self, + _request: Request, + ) -> tonic::Result> { + // TODO(danc): Errors + + debug!("Send bundle now called"); + if !self.manual_bundling_mode.load(Ordering::Relaxed) { + return Err(Status::failed_precondition( + "manual bundling mode must be enabled", + )); + } + + let (tx, rx) = oneshot::channel(); + + self.send_bundle_requester + .send(SendBundleRequest { responder: tx }) + .await + .map_err(|e| Status::internal(format!("failed to send bundle request {e}")))?; + + let result = rx + .await + .map_err(|e| Status::internal(format!("failed to receive bundle result {e}")))?; + + let tx_hash = match result { + SendBundleResult::Success { tx_hash, .. } => tx_hash, + SendBundleResult::NoOperationsInitially => { + return Err(Status::internal("no ops to send")) + } + SendBundleResult::NoOperationsAfterFeeIncreases { .. } => { + return Err(Status::internal( + "bundle initially had operations, but after increasing gas fees it was empty", + )) + } + SendBundleResult::StalledAtMaxFeeIncreases => { + return Err(Status::internal("stalled at max fee increases")) + } + SendBundleResult::Error(error) => return Err(Status::internal(error.to_string())), + }; + Ok(Response::new(DebugSendBundleNowResponse { + result: Some(debug_send_bundle_now_response::Result::Success( + DebugSendBundleNowSuccess { + transaction_hash: tx_hash.as_bytes().to_vec(), + }, + )), + })) + } + + async fn debug_set_bundling_mode( + &self, + request: Request, + ) -> tonic::Result> { + let mode = BundlingMode::from_i32(request.into_inner().mode).unwrap_or_default(); + let is_manual_bundling = match mode { + BundlingMode::Unspecified => { + return Err(Status::invalid_argument("invalid bundling mode")) + } + BundlingMode::Manual => true, + BundlingMode::Auto => false, + }; + self.manual_bundling_mode + .store(is_manual_bundling, Ordering::Relaxed); + Ok(Response::new(DebugSetBundlingModeResponse { + result: Some(debug_set_bundling_mode_response::Result::Success( + DebugSetBundlingModeSuccess {}, + )), + })) + } +} diff --git a/src/builder/task.rs b/src/builder/task.rs index 5d28b33a3..4969b67a0 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + net::SocketAddr, sync::{atomic::AtomicBool, Arc}, time::Duration, }; @@ -13,7 +14,7 @@ use ethers_signers::Signer; use rusoto_core::Region; use tokio::{sync::mpsc, time}; use tokio_util::sync::CancellationToken; -use tonic::{async_trait, transport::Server}; +use tonic::async_trait; use tracing::info; use url::Url; @@ -22,26 +23,33 @@ use crate::{ bundle_proposer::{self, BundleProposerImpl}, bundle_sender::{self, BundleSender, BundleSenderImpl}, sender::get_sender, - server::BuilderImpl, + server::{spawn_local_builder_server, spawn_remote_builder_server}, signer::{BundlerSigner, KmsSigner, LocalSigner}, transaction_tracker::{self, TransactionTrackerImpl}, + LocalBuilderServerRequest, }, common::{ contracts::i_entry_point::IEntryPoint, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, - protos::builder::{builder_server::BuilderServer, BUILDER_FILE_DESCRIPTOR_SET}, - server::format_socket_addr, simulation::{self, SimulatorImpl}, }, op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode}, }; +#[derive(Debug)] +pub enum BuilderServerMode { + Local { + req_receiver: Option>, + }, + Remote { + addr: SocketAddr, + }, +} + #[derive(Debug)] pub struct Args { - pub port: u16, - pub host: String, pub rpc_url: String, pub entry_point_address: Address, pub private_key: Option, @@ -63,6 +71,7 @@ pub struct Args { pub replacement_fee_percent_increase: u64, pub max_fee_increases: u64, pub pool_client_mode: PoolClientMode, + pub server_mode: BuilderServerMode, } #[derive(Debug)] @@ -73,8 +82,6 @@ pub struct BuilderTask { #[async_trait] impl Task for BuilderTask { async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { - let addr = format_socket_addr(&self.args.host, self.args.port).parse()?; - info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); let provider = new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; @@ -205,26 +212,35 @@ impl Task for BuilderTask { let _builder_loop_guard = { SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) }; - // gRPC server - let builder_server = BuilderImpl::new(manual_bundling_mode, send_bundle_tx); - let builder_server = BuilderServer::new(builder_server); - - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(BUILDER_FILE_DESCRIPTOR_SET) - .build()?; - - // health service - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>() - .await; - - let server_handle = Server::builder() - .add_service(builder_server) - .add_service(reflection_service) - .add_service(health_service) - .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }); - + let server_handle = match &mut self.args.server_mode { + BuilderServerMode::Local { + ref mut req_receiver, + } => { + let req_receiver = req_receiver + .take() + .context("should have local server message receiver")?; + spawn_local_builder_server( + req_receiver, + manual_bundling_mode, + send_bundle_tx, + vec![self.args.entry_point_address.clone()], + self.args.chain_id, + shutdown_token.clone(), + ) + .await? + } + BuilderServerMode::Remote { addr } => { + spawn_remote_builder_server( + *addr, + manual_bundling_mode, + send_bundle_tx, + vec![self.args.entry_point_address.clone()], + self.args.chain_id, + shutdown_token.clone(), + ) + .await? + } + }; info!("Started bundle builder"); match server_handle.await { diff --git a/src/cli/builder.rs b/src/cli/builder.rs index 568804f88..8ec15e4ce 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -6,11 +6,8 @@ use ethers::types::H256; use super::{json::get_json_config, CommonArgs}; use crate::{ - builder::{self, BuilderTask}, - common::{ - gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, - server::format_server_addr, - }, + builder::{self, BuilderServerMode, BuilderTask}, + common::{gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig}, op_pool::PoolClientMode, }; @@ -150,6 +147,7 @@ impl BuilderArgs { &self, common: &CommonArgs, pool_client_mode: PoolClientMode, + server_mode: BuilderServerMode, ) -> anyhow::Result { let priority_fee_mode = PriorityFeeMode::try_from( common.priority_fee_mode_kind.as_str(), @@ -170,8 +168,6 @@ impl BuilderArgs { }; Ok(builder::Args { - port: self.port, - host: self.host.clone(), rpc_url, entry_point_address: common .entry_points @@ -201,12 +197,9 @@ impl BuilderArgs { replacement_fee_percent_increase: self.replacement_fee_percent_increase, max_fee_increases: self.max_fee_increases, pool_client_mode, + server_mode, }) } - - pub fn url(&self, secure: bool) -> String { - format_server_addr(&self.host, self.port, secure) - } } /// CLI options for the Builder server standalone @@ -231,7 +224,13 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho pool_url, } = builder_args; let task_args = builder_args - .to_args(&common_args, PoolClientMode::Remote { url: pool_url }) + .to_args( + &common_args, + PoolClientMode::Remote { url: pool_url }, + BuilderServerMode::Remote { + addr: format!("{}:{}", builder_args.host, builder_args.port).parse()?, + }, + ) .await?; spawn_tasks_with_shutdown( diff --git a/src/cli/node.rs b/src/cli/node.rs index d896351a3..211bf86db 100644 --- a/src/cli/node.rs +++ b/src/cli/node.rs @@ -3,10 +3,10 @@ use tokio::sync::{broadcast, mpsc}; use super::{builder::BuilderArgs, pool::PoolArgs, rpc::RpcArgs, CommonArgs}; use crate::{ - builder::BuilderTask, + builder::{BuilderServerMode, BuilderTask}, common::handle, op_pool::{PoolClientMode, PoolServerMode, PoolTask}, - rpc::RpcTask, + rpc::{ClientMode, RpcTask}, }; #[derive(Debug, Args)] @@ -28,17 +28,16 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: rpc: rpc_args, } = bundler_args; - let builder_url = builder_args.url(false); - - let (tx, rx) = mpsc::channel(1024); - let (block_sender, block_receiver) = broadcast::channel(1024); + let (pool_sender, pool_rx) = mpsc::channel(1024); + let (pool_block_sender, pool_block_receiver) = broadcast::channel(1024); + let (builder_sender, builder_rx) = mpsc::channel(1024); let pool_task_args = pool_args .to_args( &common_args, PoolServerMode::Local { - req_receiver: Some(rx), - block_sender: Some(block_sender), + req_receiver: Some(pool_rx), + block_sender: Some(pool_block_sender), }, ) .await?; @@ -46,19 +45,22 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: .to_args( &common_args, PoolClientMode::Local { - sender: tx.clone(), - block_receiver: block_receiver.resubscribe(), + sender: pool_sender.clone(), + block_receiver: pool_block_receiver.resubscribe(), + }, + BuilderServerMode::Local { + req_receiver: Some(builder_rx), }, ) .await?; let rpc_task_args = rpc_args .to_args( &common_args, - builder_url, (&common_args).try_into()?, - PoolClientMode::Local { - sender: tx.clone(), - block_receiver, + ClientMode::Local { + pool_sender, + pool_block_receiver, + builder_sender, }, ) .await?; diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index ff2ceb3fd..c2b9313dc 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -7,7 +7,7 @@ use super::CommonArgs; use crate::{ common::handle::spawn_tasks_with_shutdown, op_pool::PoolClientMode, - rpc::{self, estimation, RpcTask}, + rpc::{self, estimation, ClientMode, RpcTask}, }; /// CLI options for the RPC server @@ -59,9 +59,8 @@ impl RpcArgs { pub async fn to_args( &self, common: &CommonArgs, - builder_url: String, estimation_settings: estimation::Settings, - pool_client_mode: PoolClientMode, + client_mode: ClientMode, ) -> anyhow::Result { let apis = self .api @@ -72,7 +71,6 @@ impl RpcArgs { Ok(rpc::Args { port: self.port, host: self.host.clone(), - builder_url, entry_points: common .entry_points .iter() @@ -87,7 +85,7 @@ impl RpcArgs { api_namespaces: apis, estimation_settings, rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?), - pool_client_mode, + client_mode, }) } } @@ -127,9 +125,11 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul let task_args = rpc_args .to_args( &common_args, - builder_url, (&common_args).try_into()?, - PoolClientMode::Remote { url: pool_url }, + ClientMode::Remote { + pool_url, + builder_url, + }, ) .await?; diff --git a/src/common/grpc/mocks.rs b/src/common/grpc/mocks.rs deleted file mode 100644 index 82f8dcf68..000000000 --- a/src/common/grpc/mocks.rs +++ /dev/null @@ -1,137 +0,0 @@ -use std::{future::Future, time::Duration}; - -use mockall::mock; -use tokio::{net::TcpListener, task::AbortHandle, time}; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::{ - async_trait, - transport::{self, Channel, Server}, - Request, Response, -}; - -use crate::common::protos::builder::{ - builder_client::BuilderClient, - builder_server::{Builder, BuilderServer}, - DebugSendBundleNowRequest, DebugSendBundleNowResponse, DebugSetBundlingModeRequest, - DebugSetBundlingModeResponse, -}; - -/// Maximum number of incrementing ports to try when looking for an open port -/// for a mock server. -const MAX_PORT_ATTEMPTS: u16 = 32; - -mock! { - #[derive(Debug)] - pub Builder {} - - #[async_trait] - impl Builder for Builder { - async fn debug_send_bundle_now( - &self, - _request: Request, - ) -> tonic::Result>; - - async fn debug_set_bundling_mode( - &self, - _request: Request, - ) -> tonic::Result>; - } -} - -/// A gRPC client packaged with context that when dropped will cause the -/// corresponding server to shut down. -#[derive(Debug)] -pub struct ClientHandle { - pub client: T, - abort_handle: AbortHandle, -} - -impl Drop for ClientHandle { - fn drop(&mut self) { - self.abort_handle.abort(); - } -} - -pub type BuilderHandle = ClientHandle>; - -/// Creates a `BuilderClient` connected to a local gRPC server which uses the -/// provided `mock` to respond to requests. Returns a handle which exposes the -/// client and shuts down the server when dropped. -pub async fn mock_builder_client(mock: impl Builder) -> BuilderHandle { - mock_builder_client_with_port(mock, 52845).await -} - -/// Like `mock_builder_client`, but accepts a custom port to avoid conflicts. -pub async fn mock_builder_client_with_port(mock: impl Builder, port: u16) -> BuilderHandle { - mock_client_with_port( - |listener_stream| { - Server::builder() - .add_service(BuilderServer::new(mock)) - .serve_with_incoming(listener_stream) - }, - BuilderClient::connect, - port, - ) - .await -} - -async fn mock_client_with_port( - new_server: FnS, - new_client: FnC, - port: u16, -) -> ClientHandle -where - FnS: FnOnce(TcpListenerStream) -> FutS, - FutS: Future> + Send + 'static, - FnC: FnOnce(String) -> FutC, - FutC: Future>, -{ - let (port, listener_stream) = find_open_port(port).await; - let client_addr = format!("http://[::1]:{port}"); - let abort_handle = tokio::spawn(new_server(listener_stream)).abort_handle(); - // Sleeping any amount of time is enough for the server to become ready. - time::sleep(Duration::from_millis(1)).await; - let client = new_client(client_addr) - .await - .expect("should connect to mock gRPC server"); - ClientHandle { - client, - abort_handle, - } -} - -async fn find_open_port(starting_port: u16) -> (u16, TcpListenerStream) { - for i in 0..MAX_PORT_ATTEMPTS { - let port = starting_port + i; - if let Ok(listener) = TcpListener::bind(format!("[::1]:{port}")).await { - return (port, TcpListenerStream::new(listener)); - } - } - panic!( - "couldn't find an open port in {MAX_PORT_ATTEMPTS} attempts starting from {starting_port}" - ); -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_builder_mock() { - let mut builder = MockBuilder::new(); - builder.expect_debug_send_bundle_now().returning(|_| { - Ok(Response::new(DebugSendBundleNowResponse { - transaction_hash: vec![1, 2, 3], - })) - }); - let mut handle = mock_builder_client(builder).await; - let transaction_hash = handle - .client - .debug_send_bundle_now(DebugSendBundleNowRequest {}) - .await - .expect("should get response from mock") - .into_inner() - .transaction_hash; - assert_eq!(transaction_hash, vec![1, 2, 3]); - } -} diff --git a/src/common/grpc/mod.rs b/src/common/grpc/mod.rs index 68d9e16c1..e14488328 100644 --- a/src/common/grpc/mod.rs +++ b/src/common/grpc/mod.rs @@ -1,3 +1 @@ pub mod metrics; -#[cfg(test)] -pub mod mocks; diff --git a/src/common/protos.rs b/src/common/protos.rs index 3c4a406ed..2806a0a79 100644 --- a/src/common/protos.rs +++ b/src/common/protos.rs @@ -1,43 +1,5 @@ use ethers::types::{Address, H256, U256}; -use crate::common::types::BundlingMode as RpcBundlingMode; - -pub mod builder { - use super::*; - - tonic::include_proto!("builder"); - - pub const BUILDER_FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("builder_descriptor"); - - impl From for BundlingMode { - fn from(mode: RpcBundlingMode) -> Self { - match mode { - RpcBundlingMode::Auto => BundlingMode::Auto, - RpcBundlingMode::Manual => BundlingMode::Manual, - } - } - } - - impl TryFrom for RpcBundlingMode { - type Error = ConversionError; - - fn try_from(value: BundlingMode) -> Result { - match value { - BundlingMode::Auto => Ok(RpcBundlingMode::Auto), - BundlingMode::Manual => Ok(RpcBundlingMode::Manual), - _ => Err(ConversionError::InvalidEnumValue(value as i32)), - } - } - } -} - -pub fn to_le_bytes(n: U256) -> Vec { - let mut vec = vec![0_u8; 32]; - n.to_little_endian(&mut vec); - vec -} - /// Error type for conversions from protobuf types to Ethers/local types. #[derive(Debug, thiserror::Error)] pub enum ConversionError { @@ -49,6 +11,12 @@ pub enum ConversionError { InvalidEnumValue(i32), } +pub fn to_le_bytes(n: U256) -> Vec { + let mut vec = vec![0_u8; 32]; + n.to_little_endian(&mut vec); + vec +} + pub fn from_bytes(bytes: &[u8]) -> Result { T::from_proto_bytes(bytes) } diff --git a/src/common/types/mod.rs b/src/common/types/mod.rs index 285c69d2d..a0dfa16c5 100644 --- a/src/common/types/mod.rs +++ b/src/common/types/mod.rs @@ -194,14 +194,6 @@ impl Serialize for Entity { } } -#[derive(Display, Debug, Clone, Copy, Eq, PartialEq, EnumIter, Serialize, Deserialize)] -#[display(style = "lowercase")] -#[serde(rename_all = "lowercase")] -pub enum BundlingMode { - Manual, - Auto, -} - #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct ExpectedStorage(BTreeMap>); diff --git a/src/op_pool/server/local/server.rs b/src/op_pool/server/local/server.rs index 16a0c095f..e73884cdc 100644 --- a/src/op_pool/server/local/server.rs +++ b/src/op_pool/server/local/server.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use ethers::{ - prelude::k256::elliptic_curve::rand_core::block, - types::{Address, H256}, -}; +use ethers::types::{Address, H256}; use tokio::{ sync::{broadcast, mpsc, oneshot}, task::JoinHandle, @@ -58,7 +55,7 @@ where loop { tokio::select! { _ = shutdown_token.cancelled() => { - break; + return Ok(()); } new_block = new_blocks.recv() => { if let Ok(new_block) = new_block { @@ -130,8 +127,6 @@ where } } } - - Ok(()) } } diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs index eb029fe6f..1057ebb86 100644 --- a/src/op_pool/server/mod.rs +++ b/src/op_pool/server/mod.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] -#![allow(unused_imports)] mod error; mod local; mod remote; diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs index 5f754e6ae..8d3eec006 100644 --- a/src/op_pool/server/remote/client.rs +++ b/src/op_pool/server/remote/client.rs @@ -279,7 +279,7 @@ pub async fn connect_remote_pool_client( tracing::error!("bailing from connecting client, server shutting down"); bail!("Server shutting down") } - res = connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { + res = connect_with_retries("op pool", op_pool_url, OpPoolClient::connect) => { Ok(Arc::new(RemotePoolClient::new(res?))) } } diff --git a/src/op_pool/server/remote/protos.rs b/src/op_pool/server/remote/protos.rs index 76a84230e..ec7a15ed7 100644 --- a/src/op_pool/server/remote/protos.rs +++ b/src/op_pool/server/remote/protos.rs @@ -5,8 +5,8 @@ use crate::{ common::{ protos::{from_bytes, to_le_bytes, ConversionError}, types::{ - BundlingMode as RpcBundlingMode, Entity as CommonEntity, - EntityType as CommonEntityType, UserOperation as RpcUserOperation, ValidTimeRange, + Entity as CommonEntity, EntityType as CommonEntityType, + UserOperation as RpcUserOperation, ValidTimeRange, }, }, op_pool::{ diff --git a/src/rpc/debug.rs b/src/rpc/debug.rs index 615cf2f96..45d8c89ea 100644 --- a/src/rpc/debug.rs +++ b/src/rpc/debug.rs @@ -3,17 +3,11 @@ use jsonrpsee::{ core::{Error as RpcError, RpcResult}, proc_macros::rpc, }; -use tonic::{async_trait, transport::Channel}; +use tonic::async_trait; use super::{RpcReputation, RpcUserOperation}; use crate::{ - common::{ - protos::builder::{ - builder_client, BundlingMode as ProtoBundlingMode, DebugSendBundleNowRequest, - DebugSetBundlingModeRequest, - }, - types::BundlingMode, - }, + builder::{BuilderClient, BundlingMode}, op_pool::PoolClient, }; @@ -43,13 +37,13 @@ pub trait DebugApi { async fn bundler_dump_reputation(&self, entry_point: Address) -> RpcResult>; } -pub struct DebugApi

{ +pub struct DebugApi { op_pool_client: P, - builder_client: builder_client::BuilderClient, + builder_client: B, } -impl

DebugApi

{ - pub fn new(op_pool_client: P, builder_client: builder_client::BuilderClient) -> Self { +impl DebugApi { + pub fn new(op_pool_client: P, builder_client: B) -> Self { Self { op_pool_client, builder_client, @@ -58,9 +52,10 @@ impl

DebugApi

{ } #[async_trait] -impl

DebugApiServer for DebugApi

+impl DebugApiServer for DebugApi where P: PoolClient, + B: BuilderClient, { async fn bundler_clear_state(&self) -> RpcResult { let _ = self @@ -84,22 +79,15 @@ where } async fn bundler_send_bundle_now(&self) -> RpcResult { - let response = self - .builder_client - .clone() - .debug_send_bundle_now(DebugSendBundleNowRequest {}) + self.builder_client + .debug_send_bundle_now() .await - .map_err(|e| RpcError::Custom(e.to_string()))?; - - Ok(H256::from_slice(&response.into_inner().transaction_hash)) + .map_err(|e| RpcError::Custom(e.to_string())) } async fn bundler_set_bundling_mode(&self, mode: BundlingMode) -> RpcResult { - let mode: ProtoBundlingMode = mode.into(); - self.builder_client - .clone() - .debug_set_bundling_mode(DebugSetBundlingModeRequest { mode: mode.into() }) + .debug_set_bundling_mode(mode) .await .map_err(|e| RpcError::Custom(e.to_string()))?; diff --git a/src/rpc/health.rs b/src/rpc/health.rs index 86be3a946..663ebb410 100644 --- a/src/rpc/health.rs +++ b/src/rpc/health.rs @@ -3,7 +3,10 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use tonic::{async_trait, transport::Channel}; use tonic_health::pb::{health_client::HealthClient, HealthCheckRequest}; -use crate::op_pool::{LocalPoolClient, PoolClient}; +use crate::{ + builder::{BuilderClient, LocalBuilderClient}, + op_pool::{LocalPoolClient, PoolClient}, +}; #[rpc(server, namespace = "system")] pub trait SystemApi { @@ -49,11 +52,15 @@ impl SystemApiServer for RemoteHealthCheck { pub struct LocalHealthCheck { pool_client: LocalPoolClient, + builder_client: LocalBuilderClient, } impl LocalHealthCheck { - pub fn new(pool_client: LocalPoolClient) -> Self { - Self { pool_client } + pub fn new(pool_client: LocalPoolClient, builder_client: LocalBuilderClient) -> Self { + Self { + pool_client, + builder_client, + } } } @@ -64,6 +71,10 @@ impl SystemApiServer for LocalHealthCheck { .get_supported_entry_points() .await .context("Op pool server should be live")?; + self.builder_client + .get_supported_entry_points() + .await + .context("Builder server should be live")?; Ok("ok".to_string()) } diff --git a/src/rpc/task.rs b/src/rpc/task.rs index c28d50983..40576be67 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -9,7 +9,7 @@ use jsonrpsee::{ server::{middleware::proxy_get_request::ProxyGetRequestLayer, ServerBuilder}, RpcModule, }; -use tokio::select; +use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tonic::{ async_trait, @@ -21,12 +21,13 @@ use url::Url; use super::ApiNamespace; use crate::{ - common::{ - handle::Task, - protos::builder::builder_client::BuilderClient, - server::{self, format_socket_addr}, + builder::{ + connect_remote_builder_client, BuilderClient, LocalBuilderClient, LocalBuilderServerRequest, + }, + common::{handle::Task, server::format_socket_addr}, + op_pool::{ + connect_remote_pool_client, LocalPoolClient, LocalPoolServerRequest, NewBlock, PoolClient, }, - op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode}, rpc::{ debug::{DebugApi, DebugApiServer}, eth::{estimation, EthApi, EthApiServer}, @@ -39,14 +40,26 @@ use crate::{ pub struct Args { pub port: u16, pub host: String, - pub builder_url: String, pub entry_points: Vec

, pub chain_id: u64, pub api_namespaces: Vec, pub rpc_url: String, pub estimation_settings: estimation::Settings, pub rpc_timeout: Duration, - pub pool_client_mode: PoolClientMode, + pub client_mode: ClientMode, +} + +#[derive(Debug)] +pub enum ClientMode { + Local { + pool_sender: mpsc::Sender, + pool_block_receiver: broadcast::Receiver, + builder_sender: mpsc::Sender, + }, + Remote { + pool_url: String, + builder_url: String, + }, } #[derive(Debug)] @@ -78,33 +91,41 @@ impl Task for RpcTask { let provider = Arc::new(Provider::new(client)); - // TODO(danc) local builder client - let builder_client = - Self::connect_remote_builder_client(&self.args.builder_url, shutdown_token.clone()) - .await?; - info!("Connected to builder service at {}", self.args.builder_url); - let mut module = RpcModule::new(()); - match &self.args.pool_client_mode { - PoolClientMode::Local { - sender, - block_receiver, + match &self.args.client_mode { + ClientMode::Local { + pool_sender, + pool_block_receiver, + builder_sender, } => { let pool_client = - LocalPoolClient::new(sender.clone(), block_receiver.resubscribe()); - self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; + LocalPoolClient::new(pool_sender.clone(), pool_block_receiver.resubscribe()); + let builder_client = LocalBuilderClient::new(builder_sender.clone()); + + self.attach_namespaces( + provider, + pool_client.clone(), + builder_client.clone(), + &mut module, + )?; - module.merge(LocalHealthCheck::new(pool_client).into_rpc())?; + module.merge(LocalHealthCheck::new(pool_client, builder_client).into_rpc())?; } - PoolClientMode::Remote { url } => { - let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; - info!("Connected to op_pool service at {}", url); + ClientMode::Remote { + pool_url, + builder_url, + } => { + let pool_client = + connect_remote_pool_client(pool_url, shutdown_token.clone()).await?; + let builder_client = + connect_remote_builder_client(builder_url, shutdown_token.clone()).await?; + self.attach_namespaces(provider, pool_client, builder_client, &mut module)?; - let builder_uri = Uri::from_str(&self.args.builder_url) - .context("should be a valid URI for op_pool")?; + let builder_uri = + Uri::from_str(builder_url).context("should be a valid URI for builder")?; let op_pool_uri = - Uri::from_str(url).context("should be a valid URI for op_pool")?; + Uri::from_str(pool_url).context("should be a valid URI for op_pool")?; let op_pool_health_client = HealthClient::new( Channel::builder(op_pool_uri) @@ -162,26 +183,11 @@ impl RpcTask { Box::new(self) } - async fn connect_remote_builder_client( - url: &str, - shutdown_token: CancellationToken, - ) -> anyhow::Result> { - select! { - _ = shutdown_token.cancelled() => { - tracing::error!("bailing from conneting client, server shutting down"); - bail!("Server shutting down") - } - res = server::connect_with_retries("builder from common", url, BuilderClient::connect) => { - res.context("should connect to builder") - } - } - } - - fn attach_namespaces( + fn attach_namespaces( &self, provider: Arc>>, - pool_client: C, - builder_client: BuilderClient, + pool_client: P, + builder_client: B, module: &mut RpcModule<()>, ) -> anyhow::Result<()> { for api in &self.args.api_namespaces {