Skip to content

Commit

Permalink
Set X-CURRENT-BLOCK-HASH header on every request
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD committed May 13, 2024
1 parent 35c0142 commit f06ca58
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 187 deletions.
278 changes: 116 additions & 162 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bigdecimal = { version = "0.3", features = ["serde"] }
chrono = { version = "0.4.38", features = ["serde"], default-features = false }
clap = { version = "4", features = ["derive", "env"] }
ethereum-types = "0.14"
futures = "0.3"
futures = "0.3.30"
hex = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
Expand All @@ -40,12 +40,12 @@ tower-http = { version = "0.4", features = ["trace"] }
tracing = "0.1"
web3 = "0.19"

contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "observe" }
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.255.1-temp-solvers", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "rate-limit" }
contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "observe" }
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "rate-limit" }

[dev-dependencies]
glob = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/balancer/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn load(path: &Path) -> super::Config {
.map(eth::ContractAddress)
.unwrap_or(contracts.balancer_vault),
settlement: base.contracts.settlement,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
16 changes: 16 additions & 0 deletions src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ struct Config {
#[serde(default = "default_gas_offset")]
#[serde_as(as = "serialize::U256")]
gas_offset: eth::U256,

/// How often the solver should poll the current block.
#[serde(with = "humantime_serde", default = "default_block_poll_interval")]
current_block_poll_interval: Duration,
}

fn default_relative_slippage() -> BigDecimal {
Expand All @@ -90,6 +94,10 @@ fn default_max_back_off() -> Duration {
Duration::from_secs(8)
}

fn default_block_poll_interval() -> Duration {
Duration::from_secs(1)
}

fn default_gas_offset() -> eth::U256 {
// Rough estimation of the gas overhead of settling a single
// trade via the settlement contract.
Expand Down Expand Up @@ -130,6 +138,13 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
(contracts.settlement, contracts.authenticator)
};

let block_stream = ethrpc::current_block::current_block_stream(
config.node_url.clone(),
config.current_block_poll_interval,
)
.await
.unwrap();

let config = super::Config {
node_url: config.node_url,
contracts: super::Contracts {
Expand All @@ -150,6 +165,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
)
.unwrap(),
gas_offset: eth::Gas(config.gas_offset),
block_stream,
};
(config, dex)
}
2 changes: 2 additions & 0 deletions src/infra/config/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod zeroex;

use {
crate::domain::{dex::slippage, eth},
ethrpc::current_block::CurrentBlockStream,
std::num::NonZeroUsize,
};

Expand All @@ -24,4 +25,5 @@ pub struct Config {
pub smallest_partial_fill: eth::Ether,
pub rate_limiting_strategy: rate_limit::Strategy,
pub gas_offset: eth::Gas,
pub block_stream: CurrentBlockStream,
}
1 change: 1 addition & 0 deletions src/infra/config/dex/oneinch/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn load(path: &Path) -> super::Config {
main_route_parts: config.main_route_parts,
connector_tokens: config.connector_tokens,
complexity_level: config.complexity_level,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/paraswap/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn load(path: &Path) -> super::Config {
exclude_dexs: config.exclude_dexs,
address: config.address,
partner: config.partner,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/zeroex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn load(path: &Path) -> super::Config {
settlement,
enable_rfqt: config.enable_rfqt,
enable_slippage_protection: config.enable_slippage_protection,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
10 changes: 7 additions & 3 deletions src/infra/dex/balancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
contracts::ethcontract::I256,
ethereum_types::U256,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -14,13 +15,16 @@ mod vault;

/// Bindings to the Balancer Smart Order Router (SOR) API.
pub struct Sor {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
vault: vault::Vault,
settlement: eth::ContractAddress,
}

pub struct Config {
/// Stream that yields every new block.
pub block_stream: CurrentBlockStream,

/// The URL for the Balancer SOR API.
pub endpoint: reqwest::Url,

Expand All @@ -40,7 +44,7 @@ impl Sor {

pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream),
endpoint: config.endpoint,
vault: vault::Vault::new(config.vault),
settlement: config.settlement,
Expand Down Expand Up @@ -151,7 +155,7 @@ impl Sor {
let quote = util::http::roundtrip!(
<dto::Quote, util::serialize::Never>;
self.client
.post(self.endpoint.clone())
.request(reqwest::Method::POST, self.endpoint.clone())
.json(query)
)
.await?;
Expand Down
32 changes: 31 additions & 1 deletion src/infra/dex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::domain::{auction, dex};
use {
crate::domain::{auction, dex},
ethrpc::current_block::CurrentBlockStream,
reqwest::RequestBuilder,
};

pub mod balancer;
pub mod oneinch;
Expand Down Expand Up @@ -52,6 +56,32 @@ pub enum Error {
Other(Box<dyn std::error::Error + Send + Sync>),
}

/// A wrapper around [`reqwest::Client`] to pre-set commonly used headers
/// and other properties on each request.
struct Client {
/// Client to send requests.
client: reqwest::Client,

/// Block stream to read the current block.
block_stream: CurrentBlockStream,
}

impl Client {
pub fn new(client: reqwest::Client, block_stream: CurrentBlockStream) -> Self {
Self {
client,
block_stream,
}
}

/// Prepares a request build which already has additional headers set.
pub fn request(&self, method: reqwest::Method, url: reqwest::Url) -> RequestBuilder {
self.client.request(method, url)
// Set this header to easily support caching in an egress proxy.
.header("X-CURRENT-BLOCK-HASH", self.block_stream.borrow().hash.to_string())
}
}

impl Error {
/// for instrumentization purposes
pub fn format_variant(&self) -> &'static str {
Expand Down
14 changes: 9 additions & 5 deletions src/infra/dex/oneinch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,7 +13,7 @@ mod dto;

/// Bindings to the 1Inch swap API.
pub struct OneInch {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
spender: eth::ContractAddress,
Expand All @@ -39,6 +40,9 @@ pub struct Config {
pub main_route_parts: Option<u32>,
pub connector_tokens: Option<u32>,
pub complexity_level: Option<u32>,

/// Stream that yields every new block.
pub block_stream: CurrentBlockStream,
}

pub enum Liquidity {
Expand All @@ -51,7 +55,7 @@ pub const DEFAULT_URL: &str = "https://api.1inch.io/v5.0/1/";

impl OneInch {
pub async fn new(config: Config) -> Result<Self, Error> {
let client = reqwest::Client::new();
let client = super::Client::new(Default::default(), config.block_stream);
let endpoint = config
.endpoint
.unwrap_or_else(|| DEFAULT_URL.parse().unwrap());
Expand All @@ -62,7 +66,7 @@ impl OneInch {
Liquidity::Exclude(excluded) => {
let liquidity = util::http::roundtrip!(
<dto::Liquidity, dto::Error>;
client.get(util::url::join(&endpoint, "liquidity-sources"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "liquidity-sources"))
)
.await?;

Expand All @@ -89,7 +93,7 @@ impl OneInch {
let spender = eth::ContractAddress(
util::http::roundtrip!(
<dto::Spender, dto::Error>;
client.get(util::url::join(&endpoint, "approve/spender"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "approve/spender"))
)
.await?
.address,
Expand Down Expand Up @@ -150,7 +154,7 @@ impl OneInch {
let swap = util::http::roundtrip!(
<dto::Swap, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "swap"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "swap"))
.query(query)
)
.await?;
Expand Down
13 changes: 8 additions & 5 deletions src/infra/dex/paraswap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::Address,
ethrpc::current_block::CurrentBlockStream,
};

mod dto;
Expand All @@ -12,7 +13,7 @@ pub const DEFAULT_URL: &str = "https://apiv5.paraswap.io";

/// Bindings to the ParaSwap API.
pub struct ParaSwap {
client: reqwest::Client,
client: super::Client,
config: Config,
}

Expand All @@ -29,12 +30,15 @@ pub struct Config {

/// Our partner name.
pub partner: String,

/// A stream that yields every new block.
pub block_stream: CurrentBlockStream,
}

impl ParaSwap {
pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream.clone()),
config,
}
}
Expand Down Expand Up @@ -76,8 +80,7 @@ impl ParaSwap {
) -> Result<dto::Price, Error> {
let price = util::http::roundtrip!(
<dto::Price, dto::Error>;
self.client
.get(util::url::join(&self.config.endpoint, "prices"))
self.client.request(reqwest::Method::GET, util::url::join(&self.config.endpoint, "prices"))
.query(&dto::PriceQuery::new(&self.config, order, tokens)?)
)
.await?;
Expand All @@ -96,7 +99,7 @@ impl ParaSwap {
let transaction = util::http::roundtrip!(
<dto::Transaction, dto::Error>;
self.client
.post(util::url::join(
.request(reqwest::Method::POST, util::url::join(
&self.config.endpoint,
"transactions/1?ignoreChecks=true",
))
Expand Down
13 changes: 9 additions & 4 deletions src/infra/dex/zeroex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,12 +13,15 @@ mod dto;

/// Bindings to the 0x swap API.
pub struct ZeroEx {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
}

pub struct Config {
/// The stream that yields every new block.
pub block_stream: CurrentBlockStream,

/// The base URL for the 0x swap API.
pub endpoint: reqwest::Url,

Expand Down Expand Up @@ -53,9 +57,10 @@ impl ZeroEx {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("0x-api-key", key);

reqwest::Client::builder()
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?
.build()?;
super::Client::new(client, config.block_stream)
};
let defaults = dto::Query {
taker_address: Some(config.settlement.0),
Expand Down Expand Up @@ -128,7 +133,7 @@ impl ZeroEx {
let quote = util::http::roundtrip!(
<dto::Quote, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "quote"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "quote"))
.query(query)
)
.await?;
Expand Down

0 comments on commit f06ca58

Please sign in to comment.