Skip to content

Commit

Permalink
feat(provider): modify da gas provider interface for better caching/sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 14, 2024
1 parent 51924dd commit a89bd4f
Show file tree
Hide file tree
Showing 19 changed files with 609 additions and 248 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
"crates/sim/",
"crates/task/",
"crates/types/",
"crates/utils/",
"crates/utils/"
]
default-members = ["bin/rundler"]
resolver = "2"
Expand Down
2 changes: 1 addition & 1 deletion crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ anyhow.workspace = true
async-trait.workspace = true
auto_impl.workspace = true
const-hex.workspace = true
thiserror.workspace = true
futures-util.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
tracing.workspace = true
Expand Down
38 changes: 32 additions & 6 deletions crates/provider/src/alloy/da/arbitrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use alloy_primitives::{Address, Bytes, B256};
use alloy_primitives::{Address, Bytes};
use alloy_provider::Provider as AlloyProvider;
use alloy_sol_types::sol;
use alloy_transport::Transport;
use rundler_types::da::{DAGasBlockData, DAGasUOData};
use NodeInterface::NodeInterfaceInstance;

use super::DAGasOracle;
Expand Down Expand Up @@ -63,18 +64,43 @@ where
{
async fn estimate_da_gas(
&self,
_hash: B256,
uo_data: Bytes,
to: Address,
data: Bytes,
block: BlockHashOrNumber,
_gas_price: u128,
) -> ProviderResult<u128> {
) -> ProviderResult<(u128, DAGasUOData, DAGasBlockData)> {
let ret = self
.node_interface
.gasEstimateL1Component(to, true, data)
.gasEstimateL1Component(to, true, uo_data)
.block(block.into())
.call()
.await?;
Ok(ret.gasEstimateForL1 as u128)
Ok((
ret.gasEstimateForL1 as u128,
DAGasUOData::Empty,
DAGasBlockData::Empty,
))
}

async fn block_data(&self, _block: BlockHashOrNumber) -> ProviderResult<DAGasBlockData> {
Ok(DAGasBlockData::Empty)
}

async fn uo_data(
&self,
_uo_data: Bytes,
_to: Address,
_block: BlockHashOrNumber,
) -> ProviderResult<DAGasUOData> {
Ok(DAGasUOData::Empty)
}

fn calc_da_gas_sync(
&self,
_uo_data: &DAGasUOData,
_block_data: &DAGasBlockData,
_gas_price: u128,
) -> u128 {
panic!("ArbitrumNitroDAGasOracle does not support calc_da_gas_sync")
}
}
148 changes: 75 additions & 73 deletions crates/provider/src/alloy/da/local/bedrock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::sync::Mutex as StdMutex;

use alloy_primitives::{Address, Bytes, B256};
use alloy_primitives::{Address, Bytes};
use alloy_provider::Provider as AlloyProvider;
use alloy_rpc_types_eth::state::{AccountOverride, StateOverride};
use alloy_transport::Transport;
use anyhow::Context;
use reth_tasks::pool::BlockingTaskPool;
use rundler_types::da::{BedrockDAGasBlockData, BedrockDAGasUOData, DAGasBlockData, DAGasUOData};
use rundler_utils::cache::LruMap;
use tokio::sync::Mutex as TokioMutex;

Expand Down Expand Up @@ -47,19 +46,10 @@ const MIN_TRANSACTION_SIZE: i128 = 100_000_000;
pub(crate) struct LocalBedrockDAGasOracle<AP, T> {
oracle: GasPriceOracleInstance<T, AP>,
multicaller: MulticallInstance<T, AP>,
block_da_data_cache: TokioMutex<LruMap<BlockHashOrNumber, BlockDAData>>,
uo_cache: StdMutex<LruMap<B256, u64>>,
block_data_cache: TokioMutex<LruMap<BlockHashOrNumber, BedrockDAGasBlockData>>,
blocking_task_pool: BlockingTaskPool,
}

#[derive(Debug, Clone, Copy)]
struct BlockDAData {
base_fee_scalar: u64,
l1_base_fee: u64,
blob_base_fee_scalar: u64,
blob_base_fee: u64,
}

impl<AP, T> LocalBedrockDAGasOracle<AP, T>
where
AP: AlloyProvider<T> + Clone,
Expand All @@ -71,8 +61,7 @@ where
Self {
oracle,
multicaller,
block_da_data_cache: TokioMutex::new(LruMap::new(100)),
uo_cache: StdMutex::new(LruMap::new(10000)),
block_data_cache: TokioMutex::new(LruMap::new(100)),
blocking_task_pool: BlockingTaskPool::build()
.expect("failed to build blocking task pool"),
}
Expand All @@ -87,27 +76,59 @@ where
{
async fn estimate_da_gas(
&self,
hash: B256,
_to: Address,
data: Bytes,
to: Address,
block: BlockHashOrNumber,
gas_price: u128,
) -> ProviderResult<u128> {
let block_da_data = {
let mut cache = self.block_da_data_cache.lock().await;

match cache.get(&block) {
Some(block_da_data) => *block_da_data,
None => {
let block_da_data = self.get_block_da_data(block).await?;
cache.insert(block, block_da_data);
block_da_data
}
) -> ProviderResult<(u128, DAGasUOData, DAGasBlockData)> {
let block_data = self.block_data(block).await?;
let uo_data = self.uo_data(data, to, block).await?;
let da_gas = self.calc_da_gas_sync(&uo_data, &block_data, gas_price);
Ok((da_gas, uo_data, block_data))
}

async fn block_data(&self, block: BlockHashOrNumber) -> ProviderResult<DAGasBlockData> {
let mut cache = self.block_data_cache.lock().await;
match cache.get(&block) {
Some(block_data) => Ok(DAGasBlockData::Bedrock(block_data.clone())),
None => {
let block_data = self.get_block_data(block).await?;
cache.insert(block, block_data.clone());
Ok(DAGasBlockData::Bedrock(block_data))
}
}
}

async fn uo_data(
&self,
uo_data: Bytes,
_to: Address,
_block: BlockHashOrNumber,
) -> ProviderResult<DAGasUOData> {
let uo_data = self.get_uo_data(uo_data).await?;
Ok(DAGasUOData::Bedrock(uo_data))
}

fn calc_da_gas_sync(
&self,
uo_data: &DAGasUOData,
block_data: &DAGasBlockData,
gas_price: u128,
) -> u128 {
let block_da_data = match block_data {
DAGasBlockData::Bedrock(block_da_data) => block_da_data,
_ => panic!("LocalBedrockDAGasOracle only supports Bedrock block data"),
};
let uo_data = match uo_data {
DAGasUOData::Bedrock(uo_data) => uo_data,
_ => panic!("LocalBedrockDAGasOracle only supports Bedrock user operation data"),
};

let l1_fee = self.fjord_l1_fee(hash, data, &block_da_data).await?;
Ok(l1_fee.checked_div(gas_price).unwrap_or(u128::MAX))
let fee_scaled = (block_da_data.base_fee_scalar * 16 * block_da_data.l1_base_fee
+ block_da_data.blob_base_fee_scalar * block_da_data.blob_base_fee)
as u128;
let l1_fee = (uo_data.uo_units as u128 * fee_scaled) / DECIMAL_SCALAR;
l1_fee.checked_div(gas_price).unwrap_or(u128::MAX)
}
}

Expand All @@ -125,7 +146,10 @@ where
.unwrap_or(false)
}

async fn get_block_da_data(&self, block: BlockHashOrNumber) -> ProviderResult<BlockDAData> {
async fn get_block_data(
&self,
block: BlockHashOrNumber,
) -> ProviderResult<BedrockDAGasBlockData> {
assert!(self.is_fjord().await);

let calls = vec![
Expand Down Expand Up @@ -187,57 +211,35 @@ where
.try_into()
.context("blob_base_fee too large for u64")?;

Ok(BlockDAData {
Ok(BedrockDAGasBlockData {
base_fee_scalar,
l1_base_fee,
blob_base_fee_scalar,
blob_base_fee,
})
}

async fn fjord_l1_fee(
&self,
hash: B256,
data: Bytes,
block_da_data: &BlockDAData,
) -> ProviderResult<u128> {
let maybe_uo_units = self.uo_cache.lock().unwrap().get(&hash).cloned();
let uo_units = match maybe_uo_units {
Some(uo_units) => uo_units,
None => {
// Blocking call compressing potentially a lot of data.
// Generally takes more than 100µs so should be spawned on blocking threadpool.
// https://ryhl.io/blog/async-what-is-blocking/
// https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
let compressed_len = self
.blocking_task_pool
.spawn(move || {
let mut buf = vec![0; data.len() * 2];
rundler_bindings_fastlz::compress(&data, &mut buf).len() as u64;
})
.await
.map_err(|e| anyhow::anyhow!("failed to compress data: {:?}", e))?;

let uo_units = compressed_len + 68;
self.uo_cache.lock().unwrap().insert(hash, uo_units);
uo_units
}
};
async fn get_uo_data(&self, data: Bytes) -> ProviderResult<BedrockDAGasUOData> {
// Blocking call compressing potentially a lot of data.
// Generally takes more than 100µs so should be spawned on blocking threadpool.
// https://ryhl.io/blog/async-what-is-blocking/
// https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
let compressed_len = self
.blocking_task_pool
.spawn(move || {
let mut buf = vec![0; data.len() * 2];
rundler_bindings_fastlz::compress(&data, &mut buf).len() as u64
})
.await
.map_err(|e| anyhow::anyhow!("failed to compress data: {:?}", e))?;

Ok(Self::fjord_l1_cost(uo_units, block_da_data))
}
let compressed_with_buffer = compressed_len + 68;

fn fjord_l1_cost(fast_lz_size: u64, block_da_data: &BlockDAData) -> u128 {
let estimated_size = Self::fjord_linear_regression(fast_lz_size) as u128;
let fee_scaled = (block_da_data.base_fee_scalar * 16 * block_da_data.l1_base_fee
+ block_da_data.blob_base_fee_scalar * block_da_data.blob_base_fee)
as u128;
(estimated_size * fee_scaled) / DECIMAL_SCALAR
}
let estimated_size = COST_INTERCEPT + COST_FASTLZ_COEF * compressed_with_buffer as i128;
let uo_units = estimated_size.clamp(MIN_TRANSACTION_SIZE, u64::MAX as i128);

fn fjord_linear_regression(fast_lz_size: u64) -> u64 {
let estimated_size = COST_INTERCEPT + COST_FASTLZ_COEF * fast_lz_size as i128;
let ret = estimated_size.clamp(MIN_TRANSACTION_SIZE, u64::MAX as i128);
ret as u64
Ok(BedrockDAGasUOData {
uo_units: uo_units as u64,
})
}
}
Loading

0 comments on commit a89bd4f

Please sign in to comment.