Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set X-CURRENT-BLOCK-HASH header on every request #20

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 115 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bigdecimal = { version = "0.3", features = ["serde"] }
chrono = { version = "0.4.38", features = ["serde"], default-features = false }
clap = { version = "4", features = ["derive", "env"] }
ethereum-types = "0.14"
futures = "0.3"
futures = "0.3.30"
hex = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
Expand All @@ -40,12 +40,12 @@ tower-http = { version = "0.4", features = ["trace"] }
tracing = "0.1"
web3 = "0.19"

contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "observe" }
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.255.1-temp-solvers", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.253.0", package = "rate-limit" }
contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "observe" }
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.255.0", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.258.0", package = "rate-limit" }

[dev-dependencies]
glob = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/balancer/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn load(path: &Path) -> super::Config {
.map(eth::ContractAddress)
.unwrap_or(contracts.balancer_vault),
settlement: base.contracts.settlement,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
17 changes: 17 additions & 0 deletions src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ struct Config {
#[serde(default = "default_gas_offset")]
#[serde_as(as = "serialize::U256")]
gas_offset: eth::U256,

/// How often the solver should poll the current block. If this value
/// is set each request will also have the `X-CURRENT-BLOCK-HASH` header set
/// updated based on the configured polling interval.
/// This is useful for caching requests on an egress proxy.
#[serde(with = "humantime_serde", default)]
current_block_poll_interval: Option<Duration>,
}

fn default_relative_slippage() -> BigDecimal {
Expand Down Expand Up @@ -130,6 +137,15 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
(contracts.settlement, contracts.authenticator)
};

let block_stream = match config.current_block_poll_interval {
Some(interval) => Some(
ethrpc::current_block::current_block_stream(config.node_url.clone(), interval)
.await
.unwrap(),
),
None => None,
};

let config = super::Config {
node_url: config.node_url,
contracts: super::Contracts {
Expand All @@ -150,6 +166,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
)
.unwrap(),
gas_offset: eth::Gas(config.gas_offset),
block_stream,
};
(config, dex)
}
2 changes: 2 additions & 0 deletions src/infra/config/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod zeroex;

use {
crate::domain::{dex::slippage, eth},
ethrpc::current_block::CurrentBlockStream,
std::num::NonZeroUsize,
};

Expand All @@ -24,4 +25,5 @@ pub struct Config {
pub smallest_partial_fill: eth::Ether,
pub rate_limiting_strategy: rate_limit::Strategy,
pub gas_offset: eth::Gas,
pub block_stream: Option<CurrentBlockStream>,
}
1 change: 1 addition & 0 deletions src/infra/config/dex/oneinch/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn load(path: &Path) -> super::Config {
main_route_parts: config.main_route_parts,
connector_tokens: config.connector_tokens,
complexity_level: config.complexity_level,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/paraswap/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn load(path: &Path) -> super::Config {
exclude_dexs: config.exclude_dexs,
address: config.address,
partner: config.partner,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
1 change: 1 addition & 0 deletions src/infra/config/dex/zeroex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn load(path: &Path) -> super::Config {
settlement,
enable_rfqt: config.enable_rfqt,
enable_slippage_protection: config.enable_slippage_protection,
block_stream: base.block_stream.clone(),
},
base,
}
Expand Down
10 changes: 7 additions & 3 deletions src/infra/dex/balancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
contracts::ethcontract::I256,
ethereum_types::U256,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -14,13 +15,16 @@ mod vault;

/// Bindings to the Balancer Smart Order Router (SOR) API.
pub struct Sor {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
vault: vault::Vault,
settlement: eth::ContractAddress,
}

pub struct Config {
/// Stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,

/// The URL for the Balancer SOR API.
pub endpoint: reqwest::Url,

Expand All @@ -40,7 +44,7 @@ impl Sor {

pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream),
endpoint: config.endpoint,
vault: vault::Vault::new(config.vault),
settlement: config.settlement,
Expand Down Expand Up @@ -151,7 +155,7 @@ impl Sor {
let quote = util::http::roundtrip!(
<dto::Quote, util::serialize::Never>;
self.client
.post(self.endpoint.clone())
.request(reqwest::Method::POST, self.endpoint.clone())
.json(query)
)
.await?;
Expand Down
36 changes: 35 additions & 1 deletion src/infra/dex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::domain::{auction, dex};
use {
crate::domain::{auction, dex},
ethrpc::current_block::CurrentBlockStream,
reqwest::RequestBuilder,
};

pub mod balancer;
pub mod oneinch;
Expand Down Expand Up @@ -52,6 +56,36 @@ pub enum Error {
Other(Box<dyn std::error::Error + Send + Sync>),
}

/// A wrapper around [`reqwest::Client`] to pre-set commonly used headers
/// and other properties on each request.
struct Client {
/// Client to send requests.
client: reqwest::Client,

/// Block stream to read the current block.
block_stream: Option<CurrentBlockStream>,
}

impl Client {
pub fn new(client: reqwest::Client, block_stream: Option<CurrentBlockStream>) -> Self {
Self {
client,
block_stream,
}
}

/// Prepares a request builder which already has additional headers set.
pub fn request(&self, method: reqwest::Method, url: reqwest::Url) -> RequestBuilder {
let request = self.client.request(method, url);
if let Some(stream) = &self.block_stream {
// Set this header to easily support caching in an egress proxy.
request.header("X-CURRENT-BLOCK-HASH", stream.borrow().hash.to_string())
} else {
request
}
}
}

impl Error {
/// for instrumentization purposes
pub fn format_variant(&self) -> &'static str {
Expand Down
14 changes: 9 additions & 5 deletions src/infra/dex/oneinch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,7 +13,7 @@ mod dto;

/// Bindings to the 1Inch swap API.
pub struct OneInch {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
spender: eth::ContractAddress,
Expand All @@ -39,6 +40,9 @@ pub struct Config {
pub main_route_parts: Option<u32>,
pub connector_tokens: Option<u32>,
pub complexity_level: Option<u32>,

/// Stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

pub enum Liquidity {
Expand All @@ -51,7 +55,7 @@ pub const DEFAULT_URL: &str = "https://api.1inch.io/v5.0/1/";

impl OneInch {
pub async fn new(config: Config) -> Result<Self, Error> {
let client = reqwest::Client::new();
let client = super::Client::new(Default::default(), config.block_stream);
let endpoint = config
.endpoint
.unwrap_or_else(|| DEFAULT_URL.parse().unwrap());
Expand All @@ -62,7 +66,7 @@ impl OneInch {
Liquidity::Exclude(excluded) => {
let liquidity = util::http::roundtrip!(
<dto::Liquidity, dto::Error>;
client.get(util::url::join(&endpoint, "liquidity-sources"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "liquidity-sources"))
)
.await?;

Expand All @@ -89,7 +93,7 @@ impl OneInch {
let spender = eth::ContractAddress(
util::http::roundtrip!(
<dto::Spender, dto::Error>;
client.get(util::url::join(&endpoint, "approve/spender"))
client.request(reqwest::Method::GET, util::url::join(&endpoint, "approve/spender"))
)
.await?
.address,
Expand Down Expand Up @@ -150,7 +154,7 @@ impl OneInch {
let swap = util::http::roundtrip!(
<dto::Swap, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "swap"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "swap"))
.query(query)
)
.await?;
Expand Down
13 changes: 8 additions & 5 deletions src/infra/dex/paraswap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::Address,
ethrpc::current_block::CurrentBlockStream,
};

mod dto;
Expand All @@ -12,7 +13,7 @@ pub const DEFAULT_URL: &str = "https://apiv5.paraswap.io";

/// Bindings to the ParaSwap API.
pub struct ParaSwap {
client: reqwest::Client,
client: super::Client,
config: Config,
}

Expand All @@ -29,12 +30,15 @@ pub struct Config {

/// Our partner name.
pub partner: String,

/// A stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

impl ParaSwap {
pub fn new(config: Config) -> Self {
Self {
client: reqwest::Client::new(),
client: super::Client::new(Default::default(), config.block_stream.clone()),
config,
}
}
Expand Down Expand Up @@ -76,8 +80,7 @@ impl ParaSwap {
) -> Result<dto::Price, Error> {
let price = util::http::roundtrip!(
<dto::Price, dto::Error>;
self.client
.get(util::url::join(&self.config.endpoint, "prices"))
self.client.request(reqwest::Method::GET, util::url::join(&self.config.endpoint, "prices"))
.query(&dto::PriceQuery::new(&self.config, order, tokens)?)
)
.await?;
Expand All @@ -96,7 +99,7 @@ impl ParaSwap {
let transaction = util::http::roundtrip!(
<dto::Transaction, dto::Error>;
self.client
.post(util::url::join(
.request(reqwest::Method::POST, util::url::join(
&self.config.endpoint,
"transactions/1?ignoreChecks=true",
))
Expand Down
13 changes: 9 additions & 4 deletions src/infra/dex/zeroex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
util,
},
ethereum_types::H160,
ethrpc::current_block::CurrentBlockStream,
std::sync::atomic::{self, AtomicU64},
tracing::Instrument,
};
Expand All @@ -12,7 +13,7 @@ mod dto;

/// Bindings to the 0x swap API.
pub struct ZeroEx {
client: reqwest::Client,
client: super::Client,
endpoint: reqwest::Url,
defaults: dto::Query,
}
Expand Down Expand Up @@ -42,6 +43,9 @@ pub struct Config {

/// Whether or not to enable slippage protection.
pub enable_slippage_protection: bool,

/// The stream that yields every new block.
pub block_stream: Option<CurrentBlockStream>,
}

impl ZeroEx {
Expand All @@ -53,9 +57,10 @@ impl ZeroEx {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("0x-api-key", key);

reqwest::Client::builder()
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?
.build()?;
super::Client::new(client, config.block_stream)
};
let defaults = dto::Query {
taker_address: Some(config.settlement.0),
Expand Down Expand Up @@ -128,7 +133,7 @@ impl ZeroEx {
let quote = util::http::roundtrip!(
<dto::Quote, dto::Error>;
self.client
.get(util::url::join(&self.endpoint, "quote"))
.request(reqwest::Method::GET, util::url::join(&self.endpoint, "quote"))
.query(query)
)
.await?;
Expand Down
Loading