Skip to content

Commit

Permalink
Set X-CURRENT-BLOCK-HASH header on every request (#20)
Browse files Browse the repository at this point in the history
Introduces a new optional argument `current-block-poll-interval`. When
that is set we create a `CurrentBlockStream` and use the current block
hash to populate the `X-CURRENT-BLOCK-HASH` header on every http
request.
I only made it optional to not have to introduce a bunch of mocking
logic in tests for a minor feature.

Also updates dependencies to the backend crates with the exception of
`solvers-dto`. Upgrading that requires a bunch of changes in unit tests
which are already covered in #19.
  • Loading branch information
MartinquaXD authored May 17, 2024
1 parent c1de273 commit 0441d8f
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bigdecimal = { version = "0.3", features = ["serde"] }
chrono = { version = "0.4.38", features = ["serde"], default-features = false }
clap = { version = "4", features = ["derive", "env"] }
ethereum-types = "0.14"
futures = "0.3"
futures = "0.3.30"
hex = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/balancer/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn load(path: &Path) -> super::Config {
.map(eth::ContractAddress)
.unwrap_or(contracts.balancer_vault),
settlement: base.contracts.settlement,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
17 changes: 17 additions & 0 deletions src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ struct Config {
#[serde(default = "default_gas_offset")]
#[serde_as(as = "serialize::U256")]
gas_offset: eth::U256,

/// How often the solver should poll the current block. If this value
/// is set each request will also have the `X-CURRENT-BLOCK-HASH` header set
/// updated based on the configured polling interval.
/// This is useful for caching requests on an egress proxy.
#[serde(with = "humantime_serde", default)]
current_block_poll_interval: Option<Duration>,
}

fn default_relative_slippage() -> BigDecimal {
Expand Down Expand Up @@ -130,6 +137,15 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
(contracts.settlement, contracts.authenticator)
};

let block_stream = match config.current_block_poll_interval {
Some(interval) => Some(
ethrpc::current_block::current_block_stream(config.node_url.clone(), interval)
.await
.unwrap(),
),
None => None,
};

let config = super::Config {
node_url: config.node_url,
contracts: super::Contracts {
Expand All @@ -150,6 +166,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
)
.unwrap(),
gas_offset: eth::Gas(config.gas_offset),
block_stream,
};
(config, dex)
}
2 changes: 2 additions & 0 deletions src/infra/config/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod zeroex;

use {
crate::domain::{dex::slippage, eth},
ethrpc::current_block::CurrentBlockStream,
std::num::NonZeroUsize,
};

Expand All @@ -24,4 +25,5 @@ pub struct Config {
pub smallest_partial_fill: eth::Ether,
pub rate_limiting_strategy: rate_limit::Strategy,
pub gas_offset: eth::Gas,
pub block_stream: Option<CurrentBlockStream>,
}
1 change: 1 addition & 0 deletions src/infra/config/dex/oneinch/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn load(path: &Path) -> super::Config {
main_route_parts: config.main_route_parts,
connector_tokens: config.connector_tokens,
complexity_level: config.complexity_level,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/paraswap/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub async fn load(path: &Path) -> super::Config {
address: config.address,
partner: config.partner,
chain_id: ChainId::new(config.chain_id.into()).unwrap(),
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/zeroex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn load(path: &Path) -> super::Config {
settlement,
enable_rfqt: config.enable_rfqt,
enable_slippage_protection: config.enable_slippage_protection,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
10 changes: 7 additions & 3 deletions src/infra/dex/balancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
contracts::ethcontract::I256,
ethereum_types::U256,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -14,13 +15,16 @@ mod vault;

/// Bindings to the Balancer Smart Order Router (SOR) API.
pub struct Sor {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
vault: vault::Vault,
settlement: eth::ContractAddress,
}

pub struct Config {
/// Stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,

/// The URL for the Balancer SOR API.
pub endpoint: reqwest::Url,

Expand All @@ -40,7 +44,7 @@ impl Sor {

pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream),
endpoint: config.endpoint,
vault: vault::Vault::new(config.vault),
settlement: config.settlement,
Expand Down Expand Up @@ -151,7 +155,7 @@ impl Sor {
let quote = util::http::roundtrip!(
<dto::Quote, util::serialize::Never>;
self.client
.post(self.endpoint.clone())
.request(reqwest::Method::POST, self.endpoint.clone())
.json(query)
)
.await?;
Expand Down
36 changes: 35 additions & 1 deletion src/infra/dex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::domain::{auction, dex};
use {
crate::domain::{auction, dex},
ethrpc::current_block::CurrentBlockStream,
reqwest::RequestBuilder,
};

pub mod balancer;
pub mod oneinch;
Expand Down Expand Up @@ -52,6 +56,36 @@ pub enum Error {
Other(Box<dyn std::error::Error + Send + Sync>),
}

/// A wrapper around [`reqwest::Client`] to pre-set commonly used headers
/// and other properties on each request.
struct Client {
/// Client to send requests.
client: reqwest::Client,

/// Block stream to read the current block.
block_stream: Option<CurrentBlockStream>,
}

impl Client {
pub fn new(client: reqwest::Client, block_stream: Option<CurrentBlockStream>) -> Self {
Self {
client,
block_stream,
}
}

/// Prepares a request builder which already has additional headers set.
pub fn request(&self, method: reqwest::Method, url: reqwest::Url) -> RequestBuilder {
let request = self.client.request(method, url);
if let Some(stream) = &self.block_stream {
// Set this header to easily support caching in an egress proxy.
request.header("X-CURRENT-BLOCK-HASH", stream.borrow().hash.to_string())
} else {
request
}
}
}

impl Error {
/// for instrumentization purposes
pub fn format_variant(&self) -> &'static str {
Expand Down
14 changes: 9 additions & 5 deletions src/infra/dex/oneinch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,7 +13,7 @@ mod dto;

/// Bindings to the 1Inch swap API.
pub struct OneInch {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
spender: eth::ContractAddress,
Expand All @@ -39,6 +40,9 @@ pub struct Config {
pub main_route_parts: Option<u32>,
pub connector_tokens: Option<u32>,
pub complexity_level: Option<u32>,

/// Stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

pub enum Liquidity {
Expand All @@ -51,7 +55,7 @@ pub const DEFAULT_URL: &str = "https://api.1inch.io/v5.0/1/";

impl OneInch {
pub async fn new(config: Config) -> Result<Self, Error> {
let client = reqwest::Client::new();
let client = super::Client::new(Default::default(), config.block_stream);
let endpoint = config
.endpoint
.unwrap_or_else(|| DEFAULT_URL.parse().unwrap());
Expand All @@ -62,7 +66,7 @@ impl OneInch {
Liquidity::Exclude(excluded) => {
let liquidity = util::http::roundtrip!(
<dto::Liquidity, dto::Error>;
client.get(util::url::join(&endpoint, "liquidity-sources"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "liquidity-sources"))
)
.await?;

Expand All @@ -89,7 +93,7 @@ impl OneInch {
let spender = eth::ContractAddress(
util::http::roundtrip!(
<dto::Spender, dto::Error>;
client.get(util::url::join(&endpoint, "approve/spender"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "approve/spender"))
)
.await?
.address,
Expand Down Expand Up @@ -150,7 +154,7 @@ impl OneInch {
let swap = util::http::roundtrip!(
<dto::Swap, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "swap"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "swap"))
.query(query)
)
.await?;
Expand Down
13 changes: 8 additions & 5 deletions src/infra/dex/paraswap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::Address,
ethrpc::current_block::CurrentBlockStream,
};

mod dto;
Expand All @@ -12,7 +13,7 @@ pub const DEFAULT_URL: &str = "https://apiv5.paraswap.io";

/// Bindings to the ParaSwap API.
pub struct ParaSwap {
client: reqwest::Client,
client: super::Client,
config: Config,
}

Expand All @@ -32,12 +33,15 @@ pub struct Config {

/// For which chain the solver is configured.
pub chain_id: eth::ChainId,

/// A stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

impl ParaSwap {
pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream.clone()),
config,
}
}
Expand Down Expand Up @@ -79,8 +83,7 @@ impl ParaSwap {
) -> Result<dto::Price, Error> {
let price = util::http::roundtrip!(
<dto::Price, dto::Error>;
self.client
.get(util::url::join(&self.config.endpoint, "prices"))
self.client.request(reqwest::Method::GET, util::url::join(&self.config.endpoint, "prices"))
.query(&dto::PriceQuery::new(&self.config, order, tokens)?)
)
.await?;
Expand All @@ -99,7 +102,7 @@ impl ParaSwap {
let transaction = util::http::roundtrip!(
<dto::Transaction, dto::Error>;
self.client
.post(util::url::join(
.request(reqwest::Method::POST, util::url::join(
&self.config.endpoint,
&format!("transactions/{}?ignoreChecks=true", self.config.chain_id.network_id())
))
Expand Down
13 changes: 9 additions & 4 deletions src/infra/dex/zeroex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,7 +13,7 @@ mod dto;

/// Bindings to the 0x swap API.
pub struct ZeroEx {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
}
Expand Down Expand Up @@ -42,6 +43,9 @@ pub struct Config {

/// Whether or not to enable slippage protection.
pub enable_slippage_protection: bool,

/// The stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

impl ZeroEx {
Expand All @@ -53,9 +57,10 @@ impl ZeroEx {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("0x-api-key", key);

reqwest::Client::builder()
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?
.build()?;
super::Client::new(client, config.block_stream)
};
let defaults = dto::Query {
taker_address: Some(config.settlement.0),
Expand Down Expand Up @@ -128,7 +133,7 @@ impl ZeroEx {
let quote = util::http::roundtrip!(
<dto::Quote, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "quote"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "quote"))
.query(query)
)
.await?;
Expand Down

0 comments on commit 0441d8f

Please sign in to comment.