Skip to content

Commit

Permalink
feat(pbs): stateless pbs module (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
ltitanb authored Dec 18, 2024
1 parent c2a0512 commit 895aa20
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 118 deletions.
7 changes: 2 additions & 5 deletions crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,7 @@ async fn fetch_lido_registry_keys(
chain: Chain,
node_operator_id: U256,
) -> eyre::Result<Vec<BlsPublicKey>> {
debug!(
"loading operator keys from Lido registry: chain={:?}, node_operator_id={}",
chain, node_operator_id
);
debug!(?chain, %node_operator_id, "loading operator keys from Lido registry");

let provider = ProviderBuilder::new().on_http(rpc_url);
let registry_address = lido_registry_address(chain)?;
Expand Down Expand Up @@ -263,7 +260,7 @@ async fn fetch_lido_registry_keys(
}

ensure!(keys.len() == total_keys as usize, "expected {total_keys} keys, got {}", keys.len());
let unique: Vec<_> = keys.iter().collect::<HashSet<_>>().into_iter().collect();
let unique = keys.iter().collect::<HashSet<_>>();
ensure!(unique.len() == keys.len(), "found duplicate keys in registry");

Ok(keys)
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/pbs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub const SUBMIT_BLOCK_PATH: &str = "/blinded_blocks";

// https://ethereum.github.io/builder-specs/#/Builder

pub const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
// Currently unused to enable a stateless default PBS module
// const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
pub const HEADER_VERSION_KEY: &str = "X-CommitBoost-Version";
pub const HEADER_VERSION_VALUE: &str = COMMIT_BOOST_VERSION;
pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS";
Expand Down
3 changes: 0 additions & 3 deletions crates/common/src/pbs/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ pub enum BuilderEvent {
MissedPayload {
/// Hash for the block for which no payload was received
block_hash: B256,
/// Relays which delivered the header but for which no payload was
/// received
missing_relays: String,
},
RegisterValidatorRequest(Vec<ValidatorRegistration>),
RegisterValidatorResponse,
Expand Down
9 changes: 4 additions & 5 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use cb_common::{
pbs::{
error::{PbsError, ValidationError},
GetHeaderParams, GetHeaderResponse, RelayClient, SignedExecutionPayloadHeader,
EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS,
EMPTY_TX_ROOT_HASH, HEADER_START_TIME_UNIX_MS,
},
signature::verify_signed_message,
types::Chain,
Expand Down Expand Up @@ -73,11 +73,8 @@ pub async fn get_header<S: BuilderApiState>(
return Ok(None);
}

let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers, except for start time which is set in `send_one_get_header`
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let mut handles = Vec::with_capacity(relays.len());
Expand Down Expand Up @@ -118,7 +115,9 @@ pub async fn get_header<S: BuilderApiState>(
}
}

Ok(state.add_bids(params.slot, relay_bids))
let max_bid = relay_bids.into_iter().max_by_key(|bid| bid.value());

Ok(max_bid)
}

/// Fetch the parent block from the RPC URL for extra validation of the header.
Expand Down
5 changes: 1 addition & 4 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{
error::{PbsError, ValidationError},
RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, HEADER_SLOT_UUID_KEY,
RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse,
HEADER_START_TIME_UNIX_MS,
},
utils::{get_user_agent_with_version, utcnow_ms},
Expand All @@ -27,11 +27,8 @@ pub async fn submit_block<S: BuilderApiState>(
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<SubmitBlindedBlockResponse> {
let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms()));
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

Expand Down
1 change: 0 additions & 1 deletion crates/pbs/src/routes/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
Path(params): Path<GetHeaderParams>,
) -> Result<impl IntoResponse, PbsClientError> {
state.publish_event(BuilderEvent::GetHeaderRequest(params));
state.get_or_update_slot_uuid(params.slot);

let ua = get_user_agent(&req_headers);
let ms_into_slot = ms_into_slot(params.slot, state.config.chain);
Expand Down
29 changes: 4 additions & 25 deletions crates/pbs/src/routes/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use cb_common::{
utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms},
};
use reqwest::StatusCode;
use tracing::{error, info, trace, warn};
use tracing::{error, info, trace};
use uuid::Uuid;

use crate::{
Expand All @@ -29,13 +29,8 @@ pub async fn handle_submit_block<S: BuilderApiState, A: BuilderApi<S>>(
let block_hash = signed_blinded_block.message.body.execution_payload_header.block_hash;
let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain);
let ua = get_user_agent(&req_headers);
let (curr_slot, slot_uuid) = state.get_slot_and_uuid();

info!(ua, %slot_uuid, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash);

if curr_slot != signed_blinded_block.message.slot {
warn!(expected = curr_slot, got = slot, "blinded beacon slot mismatch")
}
info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash);

match A::submit_block(signed_blinded_block, req_headers, state.clone()).await {
Ok(res) => {
Expand All @@ -48,24 +43,8 @@ pub async fn handle_submit_block<S: BuilderApiState, A: BuilderApi<S>>(
}

Err(err) => {
if let Some(fault_pubkeys) = state.get_relays_by_block_hash(slot, block_hash) {
let missing_relays = state
.relays()
.iter()
.filter(|relay| fault_pubkeys.contains(&relay.pubkey()))
.map(|relay| &**relay.id)
.collect::<Vec<_>>()
.join(",");

error!(%err, %block_hash, missing_relays, "CRITICAL: no payload received from relays");
state.publish_event(BuilderEvent::MissedPayload { block_hash, missing_relays });
} else {
error!(%err, %block_hash, "CRITICAL: no payload delivered and no relay for block hash. Was getHeader even called?");
state.publish_event(BuilderEvent::MissedPayload {
block_hash,
missing_relays: String::default(),
});
};
error!(%err, %block_hash, "CRITICAL: no payload received from relays. Check previous logs or use the Relay Data API");
state.publish_event(BuilderEvent::MissedPayload { block_hash });

let err = PbsClientError::NoPayload;
BEACON_NODE_STATUS
Expand Down
81 changes: 7 additions & 74 deletions crates/pbs/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,30 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

use alloy::{primitives::B256, rpc::types::beacon::BlsPublicKey};
use alloy::rpc::types::beacon::BlsPublicKey;
use cb_common::{
config::{PbsConfig, PbsModuleConfig},
pbs::{BuilderEvent, GetHeaderResponse, RelayClient},
pbs::{BuilderEvent, RelayClient},
};
use dashmap::DashMap;
use uuid::Uuid;

pub trait BuilderApiState: Clone + Sync + Send + 'static {}
impl BuilderApiState for () {}

/// State for the Pbs module. It can be extended by adding extra data to the
/// state
/// Config for the Pbs module. It can be extended by adding extra data to the
/// state for modules that need it
// TODO: consider remove state from the PBS module altogether
#[derive(Clone)]
pub struct PbsState<S: BuilderApiState = ()> {
/// Config data for the Pbs service
pub config: PbsModuleConfig,
/// Opaque extra data for library use
pub data: S,
/// Info about the latest slot and its uuid
current_slot_info: Arc<Mutex<(u64, Uuid)>>,
/// Keeps track of which relays delivered which block for which slot
bid_cache: Arc<DashMap<u64, Vec<GetHeaderResponse>>>,
}

impl PbsState<()> {
pub fn new(config: PbsModuleConfig) -> Self {
Self {
config,
data: (),
current_slot_info: Arc::new(Mutex::new((0, Uuid::new_v4()))),
bid_cache: Arc::new(DashMap::new()),
}
Self { config, data: () }
}

pub fn with_data<S: BuilderApiState>(self, data: S) -> PbsState<S> {
PbsState {
data,
config: self.config,
current_slot_info: self.current_slot_info,
bid_cache: self.bid_cache,
}
PbsState { data, config: self.config }
}
}

Expand All @@ -58,22 +38,6 @@ where
}
}

pub fn get_or_update_slot_uuid(&self, last_slot: u64) -> Uuid {
let mut guard = self.current_slot_info.lock().expect("poisoned");
if guard.0 < last_slot {
// new slot
guard.0 = last_slot;
guard.1 = Uuid::new_v4();
self.clear(last_slot);
}
guard.1
}

pub fn get_slot_and_uuid(&self) -> (u64, Uuid) {
let guard = self.current_slot_info.lock().expect("poisoned");
*guard
}

// Getters
pub fn pbs_config(&self) -> &PbsConfig {
&self.config.pbs_config
Expand Down Expand Up @@ -102,35 +66,4 @@ where
pub fn extra_validation_enabled(&self) -> bool {
self.config.pbs_config.extra_validation_enabled
}

/// Add some bids to the cache, the bids are all assumed to be for the
/// provided slot Returns the bid with the max value
pub fn add_bids(&self, slot: u64, bids: Vec<GetHeaderResponse>) -> Option<GetHeaderResponse> {
let mut slot_entry = self.bid_cache.entry(slot).or_default();
slot_entry.extend(bids);
slot_entry.iter().max_by_key(|bid| bid.value()).cloned()
}

/// Retrieves a list of relays pubkeys that delivered a given block hash
/// Returns None if we dont have bids for the slot or for the block hash
pub fn get_relays_by_block_hash(
&self,
slot: u64,
block_hash: B256,
) -> Option<HashSet<BlsPublicKey>> {
self.bid_cache.get(&slot).and_then(|bids| {
let filtered: HashSet<_> = bids
.iter()
.filter(|&bid| (bid.block_hash() == block_hash))
.map(|bid| bid.pubkey())
.collect();

(!filtered.is_empty()).then_some(filtered)
})
}

/// Clear bids which are more than ~3 minutes old
fn clear(&self, last_slot: u64) {
self.bid_cache.retain(|slot, _| last_slot.saturating_sub(*slot) < 15)
}
}

0 comments on commit 895aa20

Please sign in to comment.