diff --git a/README.md b/README.md index dd63647..fd5f27b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ ## reflux -Backend of solver which helps in seamless cross-chain asset consolidation. It aggregates user balances, automates routing, and suggests optimal transactions. +Backend of solver which helps in seamless cross-chain asset consolidation. It aggregates user balances, automates +routing, and suggests optimal transactions. #### Installation @@ -18,4 +19,4 @@ Once build is copleted, just run the server and test with the endpoints ### Dependencies graph -![image](./graph.png) +![image](./assets/dependency-graph.png) diff --git a/graph.png b/assets/dependency-graph.png similarity index 100% rename from graph.png rename to assets/dependency-graph.png diff --git a/crates/account-aggregation/src/service.rs b/crates/account-aggregation/src/service.rs index 1aa1369..80cc215 100644 --- a/crates/account-aggregation/src/service.rs +++ b/crates/account-aggregation/src/service.rs @@ -1,16 +1,16 @@ use std::sync::Arc; -use thiserror::Error; use derive_more::Display; use mongodb::bson; use reqwest::Client as ReqwestClient; +use thiserror::Error; use uuid::Uuid; -use storage::mongodb_client::{DBError, MongoDBClient}; use storage::DBProvider; +use storage::mongodb_client::{DBError, MongoDBClient}; use crate::types::{ - Account, AddAccountPayload, ApiResponse, Balance, RegisterAccountPayload, User, + Account, AddAccountPayload, ApiResponse, RegisterAccountPayload, TokenWithBalance, User, UserAccountMapping, UserAccountMappingQuery, UserQuery, }; @@ -217,7 +217,7 @@ impl AccountAggregationService { pub async fn get_user_accounts_balance( &self, account: &String, - ) -> Result, AccountAggregationError> { + ) -> Result, AccountAggregationError> { let mut accounts: Vec = Vec::new(); let user_id = self.get_user_id(account).await.unwrap_or(None); if let Some(user_id) = user_id { @@ -251,7 +251,7 @@ impl AccountAggregationService { /// Extract balance data from the API response fn extract_balance_data( api_response: ApiResponse, -) -> Result, AccountAggregationError> { +) -> Result, AccountAggregationError> { let chain_id = api_response.data.chain_id.to_string(); let results = api_response .data @@ -273,7 +273,7 @@ fn extract_balance_data( } else { let balance = balance_raw / 10f64.powf(item.contract_decimals.unwrap() as f64); - Some(Balance { + Some(TokenWithBalance { token: token.clone(), token_address: item.contract_ticker_symbol.clone().unwrap(), chain_id: chain_id.clone().parse::().unwrap(), diff --git a/crates/account-aggregation/src/service_trait.rs b/crates/account-aggregation/src/service_trait.rs index e84e3f7..7073b65 100644 --- a/crates/account-aggregation/src/service_trait.rs +++ b/crates/account-aggregation/src/service_trait.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use storage::mongodb_client::MongoDBClient; -use crate::types::{Account, AddAccountPayload, Balance, RegisterAccountPayload}; +use crate::types::{Account, AddAccountPayload, RegisterAccountPayload, TokenWithBalance}; #[async_trait] pub trait AccountAggregationServiceTrait { @@ -21,5 +21,5 @@ pub trait AccountAggregationServiceTrait { account_payload: RegisterAccountPayload, ) -> Result<(), Box>; fn add_account(&self, account_payload: AddAccountPayload) -> Result<(), Box>; - fn get_user_accounts_balance(&self, account: &String) -> Vec; + fn get_user_accounts_balance(&self, account: &String) -> Vec; } diff --git a/crates/account-aggregation/src/types.rs b/crates/account-aggregation/src/types.rs index a88481d..341803f 100644 --- a/crates/account-aggregation/src/types.rs +++ b/crates/account-aggregation/src/types.rs @@ -20,7 +20,7 @@ pub struct TokenData { } #[derive(Deserialize, Serialize, Debug)] -pub struct Balance { +pub struct TokenWithBalance { pub token: String, pub token_address: String, pub chain_id: u32, diff --git a/crates/routing-engine/src/engine.rs b/crates/routing-engine/src/engine.rs index 0d67972..8acf803 100644 --- a/crates/routing-engine/src/engine.rs +++ b/crates/routing-engine/src/engine.rs @@ -7,7 +7,7 @@ use thiserror::Error; use tokio::sync::RwLock; use account_aggregation::service::AccountAggregationService; -use account_aggregation::types::Balance; +use account_aggregation::types::TokenWithBalance; use config::{ChainConfig, config::BucketConfig, SolverConfig, TokenConfig}; use storage::{KeyValueStore, RedisClient, RedisClientError}; @@ -102,46 +102,85 @@ impl RoutingEngine { account, to_chain, to_token, to_value ); let user_balances = self.get_user_balance_from_agg_service(&account).await?; - // debug!("User balances: {:?}", user_balances); + debug!("User balances: {:?}", user_balances); // todo: for account aggregation, transfer same chain same asset first - let direct_assets: Vec<_> = - user_balances.iter().filter(|balance| balance.token == to_token).collect(); + let (direct_assets, non_direct_assets): (Vec<_>, _) = + user_balances.into_iter().partition(|balance| balance.token == to_token); debug!("Direct assets: {:?}", direct_assets); - // Sort direct assets by A^x / C^y, here x=2 and y=1 + let (mut selected_routes, total_amount_needed, mut total_cost) = + self.generate_optimal_routes(direct_assets, to_chain, to_token, to_value).await?; + + // Handle swap/bridge for remaining amount if needed (non-direct assets) + if total_amount_needed > 0.0 { + let (swap_routes, _, swap_total_cost) = self + .generate_optimal_routes(non_direct_assets, to_chain, to_token, total_amount_needed) + .await?; + + selected_routes.extend(swap_routes); + total_cost += swap_total_cost; + } + + debug!("Selected assets: {:?}", selected_routes); + info!( + "Total cost for user: {} on chain {} to token {} is {}", + account, to_chain, to_token, total_cost + ); + + Ok(selected_routes) + } + + async fn generate_optimal_routes( + &self, + assets: Vec, + to_chain: u32, + to_token: &str, + to_value_usd: f64, + ) -> Result<(Vec, f64, f64), RoutingEngineError> { + // Sort direct assets by Balance^x / Fee_Cost^y, here x=2 and y=1 let x = self.estimates.x_value; let y = self.estimates.y_value; - let mut sorted_assets: Vec<(&Balance, f64)> = stream::iter(direct_assets.into_iter()) - .then(|balance| async move { - let fee_cost = self - .get_cached_data( - balance.amount_in_usd, - PathQuery( - balance.chain_id, - to_chain, - balance.token.to_string(), - to_token.to_string(), - ), - ) - .await - .unwrap_or_default(); - (balance, fee_cost) - }) - .collect() - .await; + let mut assets_sorted_by_bridging_cost: Vec<(TokenWithBalance, f64)> = + stream::iter(assets.into_iter()) + .then(|balance| async move { + let fee_cost = self + .estimate_bridging_cost( + balance.amount_in_usd, + PathQuery( + balance.chain_id, + to_chain, + balance.token.to_string(), + to_token.to_string(), + ), + ) + .await; + (balance, fee_cost) + }) + .collect::>() + .await + .into_iter() + .filter_map(|(balance, cost)| match cost { + Ok(cost) => Some((balance, cost)), + Err(e) => { + error!("Failed to estimate bridging cost for balance {:?}: {}", balance, e); + None + } + }) + .collect(); - sorted_assets.sort_by(|a, b| { + // Greedily select bridging routes that + assets_sorted_by_bridging_cost.sort_by(|a, b| { let cost_a = (a.0.amount.powf(x)) / (a.1.powf(y)); let cost_b = (b.0.amount.powf(x)) / (b.1.powf(y)); cost_a.partial_cmp(&cost_b).unwrap() }); let mut total_cost = 0.0; - let mut total_amount_needed = to_value; - let mut selected_assets: Vec = Vec::new(); + let mut total_amount_needed = to_value_usd; + let mut selected_routes: Vec = Vec::new(); - for (balance, fee) in sorted_assets { + for (balance, fee) in assets_sorted_by_bridging_cost { if total_amount_needed <= 0.0 { break; } @@ -153,149 +192,38 @@ impl RoutingEngine { total_amount_needed -= amount_to_take; total_cost += fee; - let from_chain = self.chain_configs.get(&balance.chain_id).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Chain config not found for ID {}", - balance.chain_id - )) - })?; - let to_chain = self.chain_configs.get(&to_chain).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Chain config not found for ID {}", - to_chain - )) - })?; - let from_token = self.token_configs.get(&balance.token).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Token config not found for {}", - balance.token - )) - })?; - let to_token = self.token_configs.get(to_token).ok_or_else(|| { - RoutingEngineError::CacheError(format!("Token config not found for {}", to_token)) - })?; - - selected_assets.push(Route { - from_chain, + selected_routes.push(self.build_bridging_route( + balance.chain_id, to_chain, - from_token, + &balance.token, to_token, - amount_in_usd: amount_to_take, - is_smart_contract_deposit: false, - }); - } - - // Handle swap/bridge for remaining amount if needed (non direct assets) - if total_amount_needed > 0.0 { - let swap_assets: Vec<&Balance> = - user_balances.iter().filter(|balance| balance.token != to_token).collect(); - let mut sorted_assets: Vec<(&Balance, f64)> = stream::iter(swap_assets.into_iter()) - .then(|balance| async move { - let fee_cost = self - .get_cached_data( - balance.amount_in_usd, - PathQuery( - balance.chain_id, - to_chain, - balance.token.clone(), - to_token.to_string(), - ), - ) - .await - .unwrap_or_default(); - (balance, fee_cost) - }) - .collect() - .await; - - sorted_assets.sort_by(|a, b| { - let cost_a = (a.0.amount.powf(x)) / (a.1.powf(y)); - let cost_b = (b.0.amount.powf(x)) / (b.1.powf(y)); - cost_a.partial_cmp(&cost_b).unwrap() - }); - - for (balance, fee_cost) in sorted_assets { - if total_amount_needed <= 0.0 { - break; - } - - let amount_to_take = if balance.amount_in_usd >= total_amount_needed { - total_amount_needed - } else { - balance.amount_in_usd - }; - - total_amount_needed -= amount_to_take; - total_cost += fee_cost; - - let from_chain = self.chain_configs.get(&balance.chain_id).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Chain config not found for ID {}", - balance.chain_id - )) - })?; - let to_chain = self.chain_configs.get(&to_chain).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Chain config not found for ID {}", - to_chain - )) - })?; - let from_token = self.token_configs.get(&balance.token).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Token config not found for {}", - balance.token - )) - })?; - let to_token = self.token_configs.get(to_token).ok_or_else(|| { - RoutingEngineError::CacheError(format!( - "Token config not found for {}", - to_token - )) - })?; - - selected_assets.push(Route { - from_chain, - to_chain, - from_token, - to_token, - amount_in_usd: amount_to_take, - is_smart_contract_deposit: false, - }); - } + amount_to_take, + false, + )?); } - debug!("Selected assets: {:?}", selected_assets); - info!( - "Total cost for user: {} on chain {} to token {} is {}", - account, to_chain, to_token, total_cost - ); - - Ok(selected_assets) + Ok((selected_routes, total_amount_needed, total_cost)) } - async fn get_cached_data( + async fn estimate_bridging_cost( &self, - target_amount: f64, + target_amount_in_usd: f64, path: PathQuery, ) -> Result { - let mut buckets_array: Vec = self + // TODO: Maintain sorted list cache in cache, binary search + let bucket = self .buckets - .clone() - .into_iter() - .filter(|bucket| { - bucket.from_chain_id == path.0 + .iter() + .find(|&bucket| { + let matches_path = bucket.from_chain_id == path.0 && bucket.to_chain_id == path.1 && bucket.from_token == path.2 - && bucket.to_token == path.3 - }) - .collect(); - buckets_array.sort(); + && bucket.to_token == path.3; - let bucket = buckets_array - .iter() - .find(|window| { - target_amount >= window.token_amount_from_usd - && target_amount <= window.token_amount_to_usd + let matches_amount = target_amount_in_usd >= bucket.token_amount_from_usd + && target_amount_in_usd <= bucket.token_amount_to_usd; + + matches_path && matches_amount }) .ok_or_else(|| { RoutingEngineError::CacheError("No matching bucket found".to_string()) @@ -309,14 +237,14 @@ impl RoutingEngine { .ok_or_else(|| RoutingEngineError::CacheError("No cached value found".to_string()))?; let estimator: LinearRegressionEstimator = serde_json::from_str(value)?; - Ok(estimator.estimate(target_amount)) + Ok(estimator.estimate(target_amount_in_usd)) } /// Get user balance from account aggregation service async fn get_user_balance_from_agg_service( &self, account: &str, - ) -> Result, RoutingEngineError> { + ) -> Result, RoutingEngineError> { let balance = self .aas_client .get_user_accounts_balance(&account.to_string()) @@ -334,6 +262,41 @@ impl RoutingEngine { debug!("User balance: {:?}", balance); Ok(balance) } + + fn build_bridging_route( + &self, + from_chain_id: u32, + to_chain_id: u32, + from_token_id: &str, + to_token_id: &str, + token_amount_in_usd: f64, + is_smart_contract_deposit: bool, + ) -> Result { + let from_chain = self.chain_configs.get(&from_chain_id).ok_or_else(|| { + RoutingEngineError::CacheError(format!( + "Chain config not found for ID {}", + from_chain_id + )) + })?; + let to_chain = self.chain_configs.get(&to_chain_id).ok_or_else(|| { + RoutingEngineError::CacheError(format!("Chain config not found for ID {}", to_chain_id)) + })?; + let from_token = self.token_configs.get(from_token_id).ok_or_else(|| { + RoutingEngineError::CacheError(format!("Token config not found for {}", from_token_id)) + })?; + let to_token = self.token_configs.get(to_token_id).ok_or_else(|| { + RoutingEngineError::CacheError(format!("Token config not found for {}", to_token_id)) + })?; + + Ok(Route { + from_chain, + to_chain, + from_token, + to_token, + amount_in_usd: token_amount_in_usd, + is_smart_contract_deposit, + }) + } } #[cfg(test)] @@ -430,7 +393,7 @@ mod tests { let path_query = PathQuery(1, 2, "USDC".to_string(), "ETH".to_string()); // Call get_cached_data and assert the result - let result = routing_engine.get_cached_data(target_amount, path_query).await?; + let result = routing_engine.estimate_bridging_cost(target_amount, path_query).await?; assert!(result > 0.0); assert_eq!(result, dummy_estimator.estimate(target_amount)); Ok(())