From 2b70edfb04251b8e3fcb3619b9b17629f03bb7d0 Mon Sep 17 00:00:00 2001 From: Ankur Dubey Date: Thu, 11 Jul 2024 12:50:54 +0400 Subject: [PATCH] fix: Future is not send --- crates/routing-engine/src/indexer.rs | 7 +++ .../routing-engine/src/settlement_engine.rs | 51 ++++++++-------- .../routing-engine/src/source/bungee/mod.rs | 2 + crates/routing-engine/src/source/mod.rs | 16 ++--- .../src/token_price/coingecko.rs | 4 ++ crates/routing-engine/src/token_price/mod.rs | 8 +-- .../routing-engine/src/token_price/utils.rs | 2 + crates/storage/src/lib.rs | 59 ++++++------------- crates/storage/src/mongodb_client.rs | 2 + crates/storage/src/redis_client.rs | 3 + 10 files changed, 79 insertions(+), 75 deletions(-) diff --git a/crates/routing-engine/src/indexer.rs b/crates/routing-engine/src/indexer.rs index 567f0ac..57277bd 100644 --- a/crates/routing-engine/src/indexer.rs +++ b/crates/routing-engine/src/indexer.rs @@ -308,6 +308,7 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use async_trait::async_trait; use derive_more::Display; use thiserror::Error; @@ -324,6 +325,8 @@ mod tests { #[derive(Debug)] struct ModelStoreStub; + + #[async_trait] impl KeyValueStore for ModelStoreStub { type Error = Err; @@ -354,6 +357,8 @@ mod tests { #[derive(Debug)] struct ProducerStub; + + #[async_trait] impl MessageQueue for ProducerStub { type Error = Err; @@ -372,6 +377,8 @@ mod tests { #[derive(Debug)] struct TokenPriceProviderStub; + + #[async_trait] impl TokenPriceProvider for TokenPriceProviderStub { type Error = Error; diff --git a/crates/routing-engine/src/settlement_engine.rs b/crates/routing-engine/src/settlement_engine.rs index ce66126..88eb17f 100644 --- a/crates/routing-engine/src/settlement_engine.rs +++ b/crates/routing-engine/src/settlement_engine.rs @@ -7,7 +7,6 @@ use alloy::transports::http::Http; use futures::StreamExt; use log::{error, info}; use reqwest::{Client, Url}; -use ruint::aliases::U256; use ruint::Uint; use serde::Serialize; use thiserror::Error; @@ -72,31 +71,28 @@ impl .map(|route| async move { info!("Generating transactions for route: {:?}", route.route); - // let token_amount = get_token_amount_from_value_in_usd( - // &self.config, - // &self.price_provider, - // &route.route.from_token.symbol, - // route.route.from_chain.id, - // &route.source_amount_in_usd, - // ) - // .await - // .map_err(|err| SettlementEngineErrors::GetTokenAmountFromValueInUsdError(err))?; - let token_amount: U256 = Uint::from(100); + let token_amount = get_token_amount_from_value_in_usd( + &self.config, + &self.price_provider, + &route.route.from_token.symbol, + route.route.from_chain.id, + &route.source_amount_in_usd, + ) + .await + .map_err(|err| SettlementEngineErrors::GetTokenAmountFromValueInUsdError(err))?; info!("Token amount: {:?} for route {:?}", token_amount, route); - // let (ethereum_transactions, required_approval_details) = self - // .source - // .generate_route_transactions( - // &route.route, - // &token_amount, - // &route.from_address, - // &route.to_address, - // ) - // .await - // .map_err(|err| SettlementEngineErrors::GenerateTransactionsError(err))?; - let ethereum_transactions = Vec::new(); - let required_approval_details = Vec::new(); + let (ethereum_transactions, required_approval_details) = self + .source + .generate_route_transactions( + &route.route, + &token_amount, + &route.from_address, + &route.to_address, + ) + .await + .map_err(|err| SettlementEngineErrors::GenerateTransactionsError(err))?; info!("Generated transactions: {:?} for route {:?}", ethereum_transactions, route); @@ -392,6 +388,7 @@ mod tests { use std::time::Duration; use alloy::primitives::U256; + use async_trait::async_trait; use derive_more::Display; use thiserror::Error; @@ -412,6 +409,7 @@ mod tests { map: Mutex>, } + #[async_trait] impl KeyValueStore for KVStore { type Error = Err; @@ -750,5 +748,12 @@ mod tests { assert_is_send::(); assert_is_send::(); assert_is_send::>>(); + assert_is_send::(); + assert_is_send::< + Result< + (Vec, Vec), + SettlementEngineErrors>, + >, + >() } } diff --git a/crates/routing-engine/src/source/bungee/mod.rs b/crates/routing-engine/src/source/bungee/mod.rs index 2ef5e49..0b73a20 100644 --- a/crates/routing-engine/src/source/bungee/mod.rs +++ b/crates/routing-engine/src/source/bungee/mod.rs @@ -1,5 +1,6 @@ use std::str::FromStr; +use async_trait::async_trait; use log::{error, info}; use reqwest; use reqwest::header; @@ -106,6 +107,7 @@ pub enum GenerateRouteTransactionsError { InvalidU256Error(String), } +#[async_trait] impl RouteSource for BungeeClient { type FetchRouteCostError = BungeeFetchRouteCostError; diff --git a/crates/routing-engine/src/source/mod.rs b/crates/routing-engine/src/source/mod.rs index 734113f..2bfeead 100644 --- a/crates/routing-engine/src/source/mod.rs +++ b/crates/routing-engine/src/source/mod.rs @@ -1,6 +1,7 @@ use std::error::Error; use std::fmt::Debug; +use async_trait::async_trait; use ruint::aliases::U256; use serde::Serialize; @@ -25,30 +26,29 @@ pub struct RequiredApprovalDetails { pub amount: U256, } +#[async_trait] pub trait RouteSource: Debug + Send + Sync { type FetchRouteCostError: Debug + Error + Send + Sync; type GenerateRouteTransactionsError: Debug + Error + Send + Sync; type BaseRouteType: Debug + Send + Sync; - fn fetch_least_cost_route_and_cost_in_usd( + async fn fetch_least_cost_route_and_cost_in_usd( &self, route: &Route, from_token_amount: &U256, sender_address: Option<&String>, recipient_address: Option<&String>, estimation_type: &CostType, - ) -> impl futures::Future>; + ) -> Result<(Self::BaseRouteType, f64), Self::FetchRouteCostError>; - fn generate_route_transactions( + async fn generate_route_transactions( &self, route: &Route, amount: &U256, sender_address: &String, recipient_address: &String, - ) -> impl futures::Future< - Output = Result< - (Vec, Vec), - Self::GenerateRouteTransactionsError, - >, + ) -> Result< + (Vec, Vec), + Self::GenerateRouteTransactionsError, >; } diff --git a/crates/routing-engine/src/token_price/coingecko.rs b/crates/routing-engine/src/token_price/coingecko.rs index 3669107..896c93b 100644 --- a/crates/routing-engine/src/token_price/coingecko.rs +++ b/crates/routing-engine/src/token_price/coingecko.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use std::num::ParseFloatError; use std::time::Duration; +use async_trait::async_trait; use derive_more::Display; use log::{error, info}; use reqwest::{header, StatusCode}; @@ -70,6 +71,7 @@ impl CoingeckoClient { } } +#[async_trait] impl TokenPriceProvider for CoingeckoClient { type Error = CoingeckoClientError; @@ -140,6 +142,7 @@ mod tests { use std::sync::Mutex; use std::time::Duration; + use async_trait::async_trait; use derive_more::Display; use thiserror::Error; @@ -157,6 +160,7 @@ mod tests { map: Mutex>, } + #[async_trait] impl KeyValueStore for KVStore { type Error = Err; diff --git a/crates/routing-engine/src/token_price/mod.rs b/crates/routing-engine/src/token_price/mod.rs index 6c305dd..8cc774c 100644 --- a/crates/routing-engine/src/token_price/mod.rs +++ b/crates/routing-engine/src/token_price/mod.rs @@ -1,16 +1,16 @@ use std::error::Error; use std::fmt::Debug; +use async_trait::async_trait; + pub use coingecko::CoingeckoClient; mod coingecko; pub mod utils; +#[async_trait] pub trait TokenPriceProvider: Debug + Send + Sync { type Error: Error + Debug + Send + Sync; - fn get_token_price( - &self, - token_symbol: &String, - ) -> impl futures::Future>; + async fn get_token_price(&self, token_symbol: &String) -> Result; } diff --git a/crates/routing-engine/src/token_price/utils.rs b/crates/routing-engine/src/token_price/utils.rs index 2feb6ac..03e9131 100644 --- a/crates/routing-engine/src/token_price/utils.rs +++ b/crates/routing-engine/src/token_price/utils.rs @@ -71,6 +71,7 @@ pub enum Errors { mod tests { use std::fmt::Error; + use async_trait::async_trait; use ruint::Uint; use config::{Config, get_sample_config}; @@ -84,6 +85,7 @@ mod tests { #[derive(Debug)] struct TokenPriceProviderStub; + #[async_trait] impl TokenPriceProvider for TokenPriceProviderStub { type Error = Error; diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 4261540..0aea2b7 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; -use std::future; use std::time::Duration; pub use ::redis::{ControlFlow, Msg}; +use async_trait::async_trait; use mongodb::bson::Document; pub use redis_client::{RedisClient, RedisClientError}; @@ -13,43 +13,28 @@ pub mod mongodb_client; mod redis_client; +#[async_trait] pub trait KeyValueStore: Debug + Send + Sync { type Error: Error + Debug + Send + Sync; - fn get(&self, k: &String) -> impl future::Future>; + async fn get(&self, k: &String) -> Result; - fn get_multiple( - &self, - k: &Vec, - ) -> impl future::Future, Self::Error>>; + async fn get_multiple(&self, k: &Vec) -> Result, Self::Error>; - fn set( - &self, - k: &String, - v: &String, - expiry: Duration, - ) -> impl future::Future>; + async fn set(&self, k: &String, v: &String, expiry: Duration) -> Result<(), Self::Error>; - fn set_multiple( - &self, - kv: &Vec<(String, String)>, - ) -> impl future::Future>; + async fn set_multiple(&self, kv: &Vec<(String, String)>) -> Result<(), Self::Error>; - fn get_all_keys(&self) -> impl future::Future, RedisClientError>>; + async fn get_all_keys(&self) -> Result, RedisClientError>; - fn get_all_key_values( - &self, - ) -> impl future::Future, RedisClientError>>; + async fn get_all_key_values(&self) -> Result, RedisClientError>; } -pub trait MessageQueue: Debug { - type Error: Error + Debug; +#[async_trait] +pub trait MessageQueue: Debug + Send + Sync { + type Error: Error + Debug + Send + Sync; - fn publish( - &self, - topic: &str, - message: &str, - ) -> impl future::Future>; + async fn publish(&self, topic: &str, message: &str) -> Result<(), Self::Error>; fn subscribe( &self, @@ -58,21 +43,15 @@ pub trait MessageQueue: Debug { ) -> Result<(), Self::Error>; } -pub trait DBProvider: Debug { - type Error: Error + Debug; +#[async_trait] +pub trait DBProvider: Debug + Send + Sync { + type Error: Error + Debug + Send + Sync; - fn create(&self, item: &Document) -> impl future::Future>; + async fn create(&self, item: &Document) -> Result<(), Self::Error>; - fn read( - &self, - query: &Document, - ) -> impl future::Future, Self::Error>>; + async fn read(&self, query: &Document) -> Result, Self::Error>; - fn update( - &self, - query: &Document, - update: &Document, - ) -> impl future::Future>; + async fn update(&self, query: &Document, update: &Document) -> Result<(), Self::Error>; - fn delete(&self, query: &Document) -> impl future::Future>; + async fn delete(&self, query: &Document) -> Result<(), Self::Error>; } diff --git a/crates/storage/src/mongodb_client.rs b/crates/storage/src/mongodb_client.rs index 1b7bcbe..0d03768 100644 --- a/crates/storage/src/mongodb_client.rs +++ b/crates/storage/src/mongodb_client.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use derive_more::Display; use mongodb::{ bson::{self, doc, Document}, @@ -65,6 +66,7 @@ impl MongoDBClient { } } +#[async_trait] impl DBProvider for MongoDBClient { type Error = DBError; diff --git a/crates/storage/src/redis_client.rs b/crates/storage/src/redis_client.rs index 055e75c..4b77c48 100644 --- a/crates/storage/src/redis_client.rs +++ b/crates/storage/src/redis_client.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::time::Duration; +use async_trait::async_trait; use log::info; use redis::{self, aio, AsyncCommands, ControlFlow, Msg, PubSubCommands}; use redis::RedisError; @@ -22,6 +23,7 @@ impl RedisClient { } } +#[async_trait] impl KeyValueStore for RedisClient { type Error = RedisClientError; @@ -65,6 +67,7 @@ impl KeyValueStore for RedisClient { } } +#[async_trait] impl MessageQueue for RedisClient { type Error = RedisClientError;