diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 79b0c3b..096e651 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -26,6 +26,8 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 + with: + cache-all-crates: true - run: cargo test --all-features env: BUNGEE_API_KEY: ${{ secrets.BUNGEE_API_KEY }} diff --git a/bin/reflux/src/main.rs b/bin/reflux/src/main.rs index a5f38a1..a5f8419 100644 --- a/bin/reflux/src/main.rs +++ b/bin/reflux/src/main.rs @@ -118,6 +118,15 @@ async fn run_solver(config: Arc) { let redis_client = RedisClient::build(&config.infra.redis_url) .await .expect("Failed to instantiate redis client"); + + let erc20_instance_map = generate_erc20_instance_map(&config).unwrap(); + let token_price_provider = Arc::new(Mutex::new(CoingeckoClient::new( + config.coingecko.base_url.clone(), + config.coingecko.api_key.clone(), + redis_client.clone(), + Duration::from_secs(config.coingecko.expiry_sec), + ))); + let routing_engine = Arc::new(RoutingEngine::new( account_service.clone(), buckets, @@ -125,18 +134,13 @@ async fn run_solver(config: Arc) { config.solver_config.clone(), chain_configs, token_configs, + Arc::clone(&config), + Arc::clone(&token_price_provider), )); // Initialize Settlement Engine and Dependencies - let erc20_instance_map = generate_erc20_instance_map(&config).unwrap(); let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key) .expect("Failed to Instantiate Bungee Client"); - let token_price_provider = Arc::new(Mutex::new(CoingeckoClient::new( - config.coingecko.base_url.clone(), - config.coingecko.api_key.clone(), - redis_client.clone(), - Duration::from_secs(config.coingecko.expiry_sec), - ))); let settlement_engine = Arc::new(SettlementEngine::new( Arc::clone(&config), bungee_client, diff --git a/crates/account-aggregation/src/types.rs b/crates/account-aggregation/src/types.rs index 81adbba..88b8c55 100644 --- a/crates/account-aggregation/src/types.rs +++ b/crates/account-aggregation/src/types.rs @@ -79,5 +79,5 @@ pub struct PathQuery { pub account: String, pub to_chain: u32, pub to_token: String, - pub to_value: f64, + pub to_amount_token: f64, } diff --git a/crates/api/src/service_controller.rs b/crates/api/src/service_controller.rs index 5372455..57bfc74 100644 --- a/crates/api/src/service_controller.rs +++ b/crates/api/src/service_controller.rs @@ -12,7 +12,7 @@ use routing_engine::token_price::TokenPriceProvider; pub struct ServiceController { account_service: Arc, - routing_engine: Arc, + routing_engine: Arc>, settlement_engine: Arc>, token_chain_map: HashMap>, chain_supported: Vec<(u32, String)>, @@ -24,7 +24,7 @@ impl { pub fn new( account_service: Arc, - routing_engine: Arc, + routing_engine: Arc>, settlement_engine: Arc>, token_chain_map: HashMap>, chain_supported: Vec<(u32, String)>, @@ -214,7 +214,7 @@ impl /// Get best cost path for asset consolidation pub async fn get_best_path( - routing_engine: Arc, + routing_engine: Arc>, settlement_engine: Arc>, token_chain_map: HashMap>, query: types::PathQuery, @@ -240,7 +240,12 @@ impl } let routes_result = routing_engine - .get_best_cost_paths(&query.account, query.to_chain, &query.to_token, query.to_value) + .get_best_cost_paths( + &query.account, + query.to_chain, + &query.to_token, + query.to_amount_token, + ) .await; if let Err(err) = routes_result { diff --git a/crates/routing-engine/src/routing_engine.rs b/crates/routing-engine/src/routing_engine.rs index 8f4fa00..0c87e5f 100644 --- a/crates/routing-engine/src/routing_engine.rs +++ b/crates/routing-engine/src/routing_engine.rs @@ -2,26 +2,29 @@ use std::cmp; use std::collections::HashMap; use std::sync::Arc; +use alloy::dyn_abi::abi::Token; use futures::stream::{self, StreamExt}; use log::{debug, error, info}; use thiserror::Error; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use account_aggregation::{service::AccountAggregationService, types::TokenWithBalance}; -use config::{config::BucketConfig, ChainConfig, Config, SolverConfig, TokenConfig}; +use config::{ChainConfig, Config, config::BucketConfig, SolverConfig, TokenConfig}; use storage::{KeyValueStore, RedisClient, RedisClientError}; use crate::{ - estimator::{Estimator, LinearRegressionEstimator}, - BridgeResult, BridgeResultVecWrapper, Route, + BridgeResult, + BridgeResultVecWrapper, estimator::{Estimator, LinearRegressionEstimator}, Route, }; +use crate::token_price::TokenPriceProvider; +use crate::token_price::utils::{Errors, get_token_price}; /// (from_chain, to_chain, from_token, to_token) #[derive(Debug)] struct PathQuery(u32, u32, String, String); #[derive(Error, Debug)] -pub enum RoutingEngineError { +pub enum RoutingEngineError { #[error("Redis error: {0}")] RedisError(#[from] RedisClientError), @@ -36,12 +39,15 @@ pub enum RoutingEngineError { #[error("User balance fetch error: {0}")] UserBalanceFetchError(String), + + #[error("Token price provider error: {0}")] + TokenPriceProviderError(Errors), } /// Routing Engine /// This struct is responsible for calculating the best cost path for a user #[derive(Debug)] -pub struct RoutingEngine { +pub struct RoutingEngine { buckets: Vec>, aas_client: Arc, cache: Arc>>, // (hash(bucket), hash(estimator_value) @@ -49,9 +55,11 @@ pub struct RoutingEngine { estimates: Arc, chain_configs: HashMap>, token_configs: HashMap>, + config: Arc, + price_provider: Arc>, } -impl RoutingEngine { +impl RoutingEngine { pub fn new( aas_client: Arc, buckets: Vec>, @@ -59,6 +67,8 @@ impl RoutingEngine { solver_config: Arc, chain_configs: HashMap>, token_configs: HashMap>, + config: Arc, + price_provider: Arc>, ) -> Self { let cache = Arc::new(RwLock::new(HashMap::new())); @@ -70,6 +80,8 @@ impl RoutingEngine { estimates: solver_config, chain_configs, token_configs, + config, + price_provider, } } @@ -99,11 +111,11 @@ impl RoutingEngine { account: &str, to_chain: u32, to_token: &str, - to_value: f64, - ) -> Result, RoutingEngineError> { + to_amount_token: f64, + ) -> Result, RoutingEngineError> { debug!( - "Getting best cost path for user: {}, to_chain: {}, to_token: {}, to_value: {}", - account, to_chain, to_token, to_value + "Getting best cost path for user: {}, to_chain: {}, to_token: {}, to_amount_token: {}", + account, to_chain, to_token, to_amount_token ); let user_balances = self.get_user_balance_from_agg_service(&account).await?; debug!("User balances: {:?}", user_balances); @@ -114,10 +126,15 @@ impl RoutingEngine { debug!("Direct assets: {:?}", direct_assets); debug!("Non-direct assets: {:?}", non_direct_assets); - // let to_value_usd = + let to_value_usd = + get_token_price(&self.config, &self.price_provider.lock().await, &to_token.to_string()) + .await + .map_err(RoutingEngineError::TokenPriceProviderError)? + * to_amount_token; + debug!("To value in USD: {}", to_value_usd); let (mut selected_routes, total_amount_needed, mut total_cost) = self - .generate_optimal_routes(direct_assets, to_chain, to_token, to_value, account) + .generate_optimal_routes(direct_assets, to_chain, to_token, to_value_usd, account) .await?; // Handle swap/bridge for remaining amount if needed (non-direct assets) @@ -152,7 +169,7 @@ impl RoutingEngine { to_token: &str, to_value_usd: f64, to_address: &str, - ) -> Result<(Vec, f64, f64), RoutingEngineError> { + ) -> Result<(Vec, f64, f64), RoutingEngineError> { // Sort direct assets by Balance^x / Fee_Cost^y, here x=2 and y=1 let x = self.estimates.x_value; let y = self.estimates.y_value; @@ -236,7 +253,7 @@ impl RoutingEngine { &self, target_amount_in_usd: f64, path: PathQuery, - ) -> Result { + ) -> Result> { // TODO: Maintain sorted list cache in cache, binary search let bucket = self .buckets @@ -277,7 +294,7 @@ impl RoutingEngine { async fn get_user_balance_from_agg_service( &self, account: &str, - ) -> Result, RoutingEngineError> { + ) -> Result, RoutingEngineError> { let balance = self .aas_client .get_user_accounts_balance(&account.to_string()) @@ -306,7 +323,7 @@ impl RoutingEngine { is_smart_contract_deposit: bool, from_address: &str, to_address: &str, - ) -> Result { + ) -> Result> { let from_chain = Arc::clone(self.chain_configs.get(&from_chain_id).ok_or_else(|| { RoutingEngineError::CacheError(format!( "Chain config not found for ID {}", @@ -344,12 +361,12 @@ mod tests { use config::{BucketConfig, ChainConfig, SolverConfig, TokenConfig, TokenConfigByChainConfigs}; use storage::mongodb_client::MongoDBClient; - use crate::estimator::Estimator; - use crate::routing_engine::PathQuery; use crate::{ estimator::{DataPoint, LinearRegressionEstimator}, routing_engine::{RoutingEngine, RoutingEngineError}, }; + use crate::estimator::Estimator; + use crate::routing_engine::PathQuery; #[tokio::test] async fn test_get_cached_data() -> Result<(), RoutingEngineError> {