From d6c93a6ab3262a7cda4b73783b154fbd245308af Mon Sep 17 00:00:00 2001 From: Ankur Dubey Date: Tue, 25 Jun 2024 16:45:45 +0400 Subject: [PATCH] Indexer Integration (#5) * feat: Implement a configuration parser * test: Add Tests for Uniqueness * Create pull_request_template.md * chore: Add Sample config.yaml * feat: init bin and api crate * feat: LinearRegressionEstimator * feat: storage and api updates * chore: Refactor mongodb and user apis * feat: RouteSource Implementation for BungeeClient * test: BungeeClient * feat: Token Price Provider and Utils * fix: Replace BigInt by Ruint * test: Fix Bungee Tests * feat: Build Estimators in Routes Indexer * fix: Add BUNGEE_API_KEY in github action * feat: refactor engine * fix: add display in all structs * feat: added chainning instead of options * Update test.yml * Update test.yml * feat: Estimator Flow * feat: Redis Model Store * chore: refactor of storage and AAS * fix: conflict resolve * temp: fix build * chore: remove code * edit readme * feat: Redis Publish Subscribe * fix: Migrate to thiserror * feat: Token Price Provider Integration * fix: Indexer * fix: Tests * temp: broken indexer * fix: abolish scheduler * fix: tests * chore: Enhance Logging * feat: Command Line Config Parser * feat: Dockerfile * fix: Dockerfile * fix: Github Tests * fix: Github Tests * fix: Github Tests * fix: Github Tests * fix: Make Config::test private --------- Co-authored-by: amanraj1608 Co-authored-by: Aman Raj <42104907+AmanRaj1608@users.noreply.github.com> --- .dockerignore | 3 + .github/workflows/test.yml | 18 +- .gitignore | 2 +- Dockerfile | 11 + bin/reflux/Cargo.toml | 2 + bin/reflux/src/main.rs | 113 +++++- config.yaml.example | 95 +++-- crates/config/src/config.rs | 180 ++-------- crates/routing-engine/Cargo.toml | 1 + crates/routing-engine/src/engine.rs | 13 +- .../estimator/linear_regression_estimator.rs | 28 +- crates/routing-engine/src/estimator/mod.rs | 5 +- crates/routing-engine/src/indexer.rs | 340 ++++++++++-------- crates/routing-engine/src/lib.rs | 21 +- .../routing-engine/src/source/bungee/mod.rs | 91 ++--- crates/routing-engine/src/source/mod.rs | 19 +- crates/routing-engine/src/tests.rs | 13 +- .../src/token_price/coingecko.rs | 237 ++++++++++++ crates/routing-engine/src/token_price/mod.rs | 12 +- .../routing-engine/src/token_price/utils.rs | 63 +--- crates/storage/Cargo.toml | 1 + crates/storage/src/lib.rs | 40 ++- crates/storage/src/redis.rs | 72 ++-- docker-compose.yml | 47 +++ 24 files changed, 890 insertions(+), 537 deletions(-) create mode 100644 Dockerfile create mode 100644 crates/routing-engine/src/token_price/coingecko.rs create mode 100644 docker-compose.yml diff --git a/.dockerignore b/.dockerignore index 39eccd1..92ce530 100644 --- a/.dockerignore +++ b/.dockerignore @@ -23,3 +23,6 @@ # include example files !/examples + +*.yaml +*.env diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d08ab3c..7f5ba8b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,14 +1,13 @@ name: Test Suite -on: [pull_request, push] +on: [ pull_request ] jobs: test: name: cargo test runs-on: ubuntu-latest - services: mongodb: - image: mongo:6 + image: mongo:latest ports: - 27017:27017 options: >- @@ -16,13 +15,16 @@ jobs: --health-interval=30s --health-timeout=10s --health-retries=10 - + redis: + image: redis:latest + ports: + - 6379:6379 steps: - uses: actions/checkout@v4 - - - name: Set up Rust - uses: dtolnay/rust-toolchain@stable - + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - run: cargo test --all-features env: BUNGEE_API_KEY: ${{ secrets.BUNGEE_API_KEY }} + COINGECKO_API_KEY: ${{ secrets.COINGECKO_API_KEY }} environment: Testing diff --git a/.gitignore b/.gitignore index 71c1f86..19b5e74 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,4 @@ Cargo.lock *.env *.swp -config.yaml +*.yaml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7b3570b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM rust:latest as builder +WORKDIR /reflux +COPY . . +RUN cargo install --path bin/reflux --profile release + +FROM debian:latest +RUN apt-get update +RUN apt-get upgrade -y +RUN apt-get install -y libssl-dev ca-certificates +COPY --from=builder /usr/local/cargo/bin/reflux /app/reflux + diff --git a/bin/reflux/Cargo.toml b/bin/reflux/Cargo.toml index 6bdab74..2856732 100644 --- a/bin/reflux/Cargo.toml +++ b/bin/reflux/Cargo.toml @@ -8,6 +8,8 @@ tokio = { version = "1.38.0", features = ["full"] } tower-http = { version = "0.5.2", features = ["cors"] } axum = "0.7.5" log = "0.4.21" +simple_logger = "5.0.0" +clap = { version = "4.5.7", features = ["derive"] } # workspace dependencies account-aggregation = { workspace = true } diff --git a/bin/reflux/src/main.rs b/bin/reflux/src/main.rs index 5b7ce8c..7de9173 100644 --- a/bin/reflux/src/main.rs +++ b/bin/reflux/src/main.rs @@ -1,28 +1,78 @@ +use std::time::Duration; + +use axum::http::Method; +use clap::Parser; +use log::{debug, error, info}; +use tokio; +use tokio::signal; +use tower_http::cors::{Any, CorsLayer}; + use account_aggregation::service::AccountAggregationService; use api::service_controller::ServiceController; -use axum::http::Method; use config::Config; -use log::info; +use routing_engine::{BungeeClient, CoingeckoClient, Indexer}; use routing_engine::engine::RoutingEngine; +use routing_engine::estimator::LinearRegressionEstimator; use storage::mongodb_provider::MongoDBProvider; -use tokio; -use tokio::signal; -use tower_http::cors::{Any, CorsLayer}; +use storage::RedisClient; + +#[derive(Parser, Debug)] +struct Args { + /// Run the Solver (default) + #[arg(short, long)] + solver: bool, + + /// Run the Indexer + #[arg(short, long)] + indexer: bool, + + /// Config file path + #[arg(short, long, default_value = "config.yaml")] + config: String, +} #[tokio::main] async fn main() { + simple_logger::SimpleLogger::new().env().init().unwrap(); + + let mut args = Args::parse(); + debug!("Args: {:?}", args); + + if args.indexer && args.solver { + panic!("Cannot run both indexer and solver at the same time"); + } + + if !args.indexer && !args.solver { + args.solver = true; + debug!("Running Solver by default"); + } + // Load configuration from yaml - let config = Config::from_file("config.yaml").expect("Failed to load config file"); - let mongodb_uri = config.infra.mongo_url; - let (app_host, app_port) = (config.server.host, config.server.port); + let config = Config::from_file(&args.config).expect("Failed to load config file"); + + if args.indexer { + run_indexer(config).await; + } else if args.solver { + run_solver(config).await; + } +} + +async fn run_solver(config: Config) { + info!("Starting Reflux Server"); + + let (app_host, app_port) = (config.server.host.clone(), config.server.port.clone()); // Instance of MongoDBProvider for users and account mappings - let user_db_provider = - MongoDBProvider::new(&mongodb_uri, "reflux".to_string(), "users".to_string(), true) - .await - .expect("Failed to create MongoDB provider for users"); + let user_db_provider = MongoDBProvider::new( + &config.infra.mongo_url, + "reflux".to_string(), + "users".to_string(), + true, + ) + .await + .expect("Failed to create MongoDB provider for users"); let account_mapping_db_provider = MongoDBProvider::new( - &mongodb_uri, + &config.infra.mongo_url, "reflux".to_string(), "account_mappings".to_string(), false, @@ -30,7 +80,8 @@ async fn main() { .await .expect("Failed to create MongoDB provider for account mappings"); - let (covalent_base_url, covalent_api_key) = (config.covalent.base_url, config.covalent.api_key); + let (covalent_base_url, covalent_api_key) = + (config.covalent.base_url.clone(), config.covalent.api_key.clone()); // Initialize account aggregation service for api let account_service = AccountAggregationService::new( @@ -65,6 +116,40 @@ async fn main() { info!("Server stopped."); } +async fn run_indexer(config: Config) { + info!("Configuring Indexer"); + + let config = config; + + let redis_provider = RedisClient::build(&config.infra.redis_url) + .await + .expect("Failed to instantiate redis client"); + + let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key) + .expect("Failed to Instantiate Bungee Client"); + + let token_price_provider = CoingeckoClient::new( + &config.coingecko.base_url, + &config.coingecko.api_key, + &redis_provider, + Duration::from_secs(config.coingecko.expiry_sec), + ); + + let indexer: Indexer> = + Indexer::new( + &config, + &bungee_client, + &redis_provider, + &redis_provider, + &token_price_provider, + ); + + match indexer.run::().await { + Ok(_) => info!("Indexer Job Completed"), + Err(e) => error!("Indexer Job Failed: {}", e), + }; +} + async fn shutdown_signal() { let ctrl_c = async { signal::ctrl_c().await.expect("Unable to handle ctrl+c"); diff --git a/config.yaml.example b/config.yaml.example index 3fd49b2..c17a73a 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -2,57 +2,76 @@ chains: - id: 1 name: Ethereum is_enabled: true - - id: 56 - name: Binance Smart Chain + - id: 42161 + name: Arbitrum is_enabled: true tokens: - - symbol: ETH + - symbol: USDC is_enabled: true + coingecko_symbol: usd-coin by_chain: 1: is_enabled: true - decimals: 18 - address: '0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE' - 56: + decimals: 6 + address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' + 42161: is_enabled: true - decimals: 18 - address: '0x2170Ed0880ac9A755fd29B2688956BD959F933F8' - - symbol: BNB - is_enabled: true - by_chain: - 1: - is_enabled: false - decimals: 18 - address: '0xB8c77482e45F1F44dE1745F52C74426C631bDD52' - 56: - is_enabled: true - decimals: 18 - address: '0xEEeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' + decimals: 6 + address: '0xaf88d065e77c8cC2239327C5EDb3A432268e5831' buckets: - from_chain_id: 1 - to_chain_id: 56 - from_token: ETH - to_token: BNB - is_smart_contract_deposit_supported: true - token_amount_from_usd: 0.1 + to_chain_id: 42161 + from_token: USDC + to_token: USDC + is_smart_contract_deposit_supported: false + token_amount_from_usd: 1 + token_amount_to_usd: 10 + - from_chain_id: 1 + to_chain_id: 42161 + from_token: USDC + to_token: USDC + is_smart_contract_deposit_supported: false + token_amount_from_usd: 10 token_amount_to_usd: 100 + - from_chain_id: 1 + to_chain_id: 42161 + from_token: USDC + to_token: USDC + is_smart_contract_deposit_supported: false + token_amount_from_usd: 100 + token_amount_to_usd: 1000 + - from_chain_id: 1 + to_chain_id: 42161 + from_token: USDC + to_token: USDC + is_smart_contract_deposit_supported: false + token_amount_from_usd: 1000 + token_amount_to_usd: 10000 + - from_chain_id: 1 + to_chain_id: 42161 + from_token: USDC + to_token: USDC + is_smart_contract_deposit_supported: false + token_amount_from_usd: 10000 + token_amount_to_usd: 100000 bungee: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' + base_url: https://api.socket.tech/v2 + api_key: covalent: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' + base_url: '' + api_key: 'my-api' coingecko: - base_url: 'https://api.coingecko.com' - api_key: 'my-api' + base_url: https://api.coingecko.com/api/v3 + api_key: + expiry_sec: 300 infra: - redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' - mongo_url: 'mongodb://localhost:27017' + redis_url: redis://localhost:6379 + mongo_url: mongodb://127.0.0.1:27017 server: - port: 8080 - host: 'localhost' + port: 8080 + host: localhost indexer_config: - is_indexer: true - indexer_update_topic: indexer_update - indexer_update_message: message + indexer_update_topic: indexer_update + indexer_update_message: message + points_per_bucket: 3 + diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 6794e74..5c094ca 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -4,8 +4,8 @@ use std::ops::Deref; use derive_more::{Display, From, Into}; use serde::Deserialize; -use serde_valid::yaml::FromYamlStr; use serde_valid::{UniqueItemsError, Validate, ValidateUniqueItems}; +use serde_valid::yaml::FromYamlStr; // Config Type #[derive(Debug)] @@ -254,6 +254,14 @@ pub struct BucketConfig { pub token_amount_to_usd: f64, } +impl BucketConfig { + pub fn get_hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.hash(&mut s); + s.finish() + } +} + // Implementation for treating a BucketConfig as a key in a k-v pair impl Hash for BucketConfig { fn hash(&self, state: &mut H) { @@ -273,13 +281,7 @@ impl Hash for BucketConfig { impl PartialEq for BucketConfig { fn eq(&self, other: &Self) -> bool { - let mut s1 = DefaultHasher::new(); - let mut s2 = DefaultHasher::new(); - - self.hash(&mut s1); - other.hash(&mut s2); - - s1.finish() == s2.finish() + self.get_hash() == other.get_hash() } } @@ -302,6 +304,9 @@ pub struct TokenConfig { // The token symbol #[validate(min_length = 1)] pub symbol: String, + // The symbol of the token in coingecko API + #[validate(min_length = 1)] + pub coingecko_symbol: String, // Whether the token across chains is supported pub is_enabled: bool, // Chain Specific Configuration @@ -362,6 +367,10 @@ pub struct CoinGeckoConfig { // API key to access the CoinGecko API #[validate(min_length = 1)] pub api_key: String, + + // The expiry time of the CoinGecko API key + #[validate(minimum = 1)] + pub expiry_sec: u64, } #[derive(Debug, Deserialize, Validate)] @@ -382,9 +391,6 @@ pub struct InfraConfig { // The URL of the Redis #[validate(pattern = r"redis://[-a-zA-Z0-9@:%._\+~#=]{1,256}")] pub redis_url: String, - // The URL of the RabbitMQ - #[validate(pattern = r"amqp://[-a-zA-Z0-9@:%._\+~#=]{1,256}")] - pub rabbitmq_url: String, // The URL of the MongoDB #[validate(pattern = r"mongodb://[-a-zA-Z0-9@:%._\+~#=]{1,256}")] pub mongo_url: String, @@ -403,144 +409,28 @@ pub struct ServerConfig { #[derive(Debug, Deserialize, Validate)] pub struct IndexerConfig { - pub is_indexer: bool, - #[validate(min_length = 1)] pub indexer_update_topic: String, #[validate(min_length = 1)] pub indexer_update_message: String, + + #[validate(minimum = 2)] + pub points_per_bucket: u64, +} + +pub fn get_sample_config() -> Config { + Config::from_file("../../config.yaml.example").unwrap() } #[cfg(test)] mod tests { use crate::config::{Config, ConfigError}; + use crate::get_sample_config; #[test] fn test_config_parsing() { - let config = r#" -chains: - - id: 1 - name: Ethereum - is_enabled: true - - id: 56 - name: Binance Smart Chain - is_enabled: true -tokens: - - symbol: ETH - is_enabled: true - by_chain: - 1: - is_enabled: true - decimals: 18 - address: '0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE' - 56: - is_enabled: true - decimals: 18 - address: '0x2170Ed0880ac9A755fd29B2688956BD959F933F8' - - symbol: BNB - is_enabled: true - by_chain: - 1: - is_enabled: false - decimals: 18 - address: '0xB8c77482e45F1F44dE1745F52C74426C631bDD52' - 56: - is_enabled: true - decimals: 18 - address: '0xEEeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' -buckets: - - from_chain_id: 1 - to_chain_id: 56 - from_token: ETH - to_token: BNB - is_smart_contract_deposit_supported: true - token_amount_from_usd: 0.1 - token_amount_to_usd: 100 -bungee: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' -covalent: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' -coingecko: - base_url: 'https://api.coingecko.com' - api_key: 'my-api' -infra: - redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' - mongo_url: 'mongodb://localhost:27017' -server: - port: 8080 - host: 'localhost' -indexer_config: - is_indexer: true - indexer_update_topic: indexer_update - indexer_update_message: message - "#; - let config = Config::from_yaml_str(&config).unwrap(); - - assert_eq!(config.chains.len(), 2); - assert_eq!(config.chains[&1].id, 1); - assert_eq!(config.chains[&1].name, "Ethereum"); - assert_eq!(config.chains[&1].is_enabled, true); - assert_eq!(config.chains[&56].id, 56); - assert_eq!(config.chains[&56].name, "Binance Smart Chain"); - assert_eq!(config.chains[&56].is_enabled, true); - - assert_eq!(config.tokens.len(), 2); - assert_eq!(config.tokens["ETH"].symbol, "ETH"); - assert_eq!(config.tokens["ETH"].is_enabled, true); - assert_eq!(config.tokens["ETH"].by_chain.len(), 2); - assert_eq!(config.tokens["ETH"].by_chain[&1].decimals, 18); - assert_eq!( - config.tokens["ETH"].by_chain[&1].address, - "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" - ); - assert_eq!(config.tokens["ETH"].by_chain[&1].is_enabled, true); - assert_eq!(config.tokens["ETH"].by_chain[&56].decimals, 18); - assert_eq!( - config.tokens["ETH"].by_chain[&56].address, - "0x2170Ed0880ac9A755fd29B2688956BD959F933F8" - ); - assert_eq!(config.tokens["ETH"].by_chain[&56].is_enabled, true); - assert_eq!(config.tokens["BNB"].symbol, "BNB"); - assert_eq!(config.tokens["BNB"].is_enabled, true); - assert_eq!(config.tokens["BNB"].by_chain.len(), 2); - assert_eq!(config.tokens["BNB"].by_chain[&1].decimals, 18); - assert_eq!( - config.tokens["BNB"].by_chain[&1].address, - "0xB8c77482e45F1F44dE1745F52C74426C631bDD52" - ); - assert_eq!(config.tokens["BNB"].by_chain[&1].is_enabled, false); - assert_eq!(config.tokens["BNB"].by_chain[&56].decimals, 18); - assert_eq!( - config.tokens["BNB"].by_chain[&56].address, - "0xEEeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" - ); - assert_eq!(config.tokens["BNB"].by_chain[&56].is_enabled, true); - - assert_eq!(config.buckets.len(), 1); - assert_eq!(config.buckets[0].from_chain_id, 1); - assert_eq!(config.buckets[0].to_chain_id, 56); - assert_eq!(config.buckets[0].from_token, "ETH"); - assert_eq!(config.buckets[0].to_token, "BNB"); - assert_eq!(config.buckets[0].is_smart_contract_deposit_supported, true); - assert_eq!(config.buckets[0].token_amount_from_usd, 0.1); - assert_eq!(config.buckets[0].token_amount_to_usd, 100.0); - - assert_eq!(config.covalent.base_url, "https://api.bungee.exchange"); - assert_eq!(config.coingecko.base_url, "https://api.coingecko.com"); - assert_eq!(config.bungee.api_key, "my-api"); - assert_eq!(config.covalent.api_key, "my-api"); - assert_eq!(config.coingecko.api_key, "my-api"); - assert_eq!(config.infra.redis_url, "redis://localhost:6379"); - assert_eq!(config.infra.rabbitmq_url, "amqp://localhost:5672"); - assert_eq!(config.infra.mongo_url, "mongodb://localhost:27017"); - - assert_eq!(config.indexer_config.is_indexer, true); - assert_eq!(config.indexer_config.indexer_update_topic, "indexer_update"); - assert_eq!(config.indexer_config.indexer_update_message, "message"); + get_sample_config(); } #[test] @@ -564,9 +454,9 @@ covalent: coingecko: base_url: 'https://api.coingecko.com' api_key: 'my-api' + expiry_sec: 5 infra: redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' mongo_url: 'mongodb://localhost:27017' server: port: 8080 @@ -575,8 +465,8 @@ indexer_config: is_indexer: true indexer_update_topic: indexer_update indexer_update_message: message - "#; - + points_per_bucket: 10 +"#; assert_eq!( if let ConfigError::SerdeError(err) = Config::from_yaml_str(&config).unwrap_err() { let err = err.as_validation_errors().unwrap().to_string(); @@ -597,6 +487,7 @@ chains: tokens: - symbol: ETH is_enabled: true + coingecko_symbol: ethereum by_chain: 1: is_enabled: true @@ -608,6 +499,7 @@ tokens: address: '0x2170Ed0880ac9A755fd29B2688956BD959F933F8' - symbol: ETH is_enabled: true + coingecko_symbol: ethereum by_chain: 1: is_enabled: true @@ -627,9 +519,9 @@ covalent: coingecko: base_url: 'https://api.coingecko.com' api_key: 'my-api' + expiry_sec: 5 infra: redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' mongo_url: 'mongodb://localhost:27017' server: port: 8080 @@ -638,7 +530,8 @@ indexer_config: is_indexer: true indexer_update_topic: indexer_update indexer_update_message: message - "#; + points_per_bucket: 10 +"#; assert_eq!( if let ConfigError::SerdeError(err) = Config::from_yaml_str(&config).unwrap_err() { @@ -652,9 +545,4 @@ indexer_config: true ); } - - #[test] - fn test_sample_config_should_be_valid() { - assert_eq!(Config::from_file("../../config.yaml.example").is_ok(), true); - } } diff --git a/crates/routing-engine/Cargo.toml b/crates/routing-engine/Cargo.toml index e1ea863..0e7b475 100644 --- a/crates/routing-engine/Cargo.toml +++ b/crates/routing-engine/Cargo.toml @@ -14,6 +14,7 @@ reqwest = "0.12.4" ruint = "1.12.3" linreg = "0.2.0" thiserror = "1.0.61" +log = "0.4.21" # workspace dependencies account-aggregation = { workspace = true } diff --git a/crates/routing-engine/src/engine.rs b/crates/routing-engine/src/engine.rs index aa58b5d..04f08dd 100644 --- a/crates/routing-engine/src/engine.rs +++ b/crates/routing-engine/src/engine.rs @@ -1,11 +1,14 @@ -use crate::route_fee_bucket::RouteFeeBucket; -use account_aggregation::service::AccountAggregationService; -use account_aggregation::types::Balance; -use derive_more::Display; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; +use derive_more::Display; +use serde::{Deserialize, Serialize}; + +use account_aggregation::service::AccountAggregationService; +use account_aggregation::types::Balance; + +use crate::route_fee_bucket::RouteFeeBucket; + #[derive(Serialize, Deserialize, Debug, Display, PartialEq, Clone)] #[display( "Route: from_chain: {}, to_chain: {}, token: {}, amount: {}, path: {:?}", diff --git a/crates/routing-engine/src/estimator/linear_regression_estimator.rs b/crates/routing-engine/src/estimator/linear_regression_estimator.rs index d5b70f1..7d0a230 100644 --- a/crates/routing-engine/src/estimator/linear_regression_estimator.rs +++ b/crates/routing-engine/src/estimator/linear_regression_estimator.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use thiserror::Error; use crate::estimator::{DataPoint, Estimator}; @@ -9,12 +10,21 @@ pub struct LinearRegressionEstimator { } impl<'de> Estimator<'de, f64, f64> for LinearRegressionEstimator { - type Error = linreg::Error; + type Error = LinearRegressionEstimationError; + + fn build(data: Vec>) -> Result { + if data.len() == 0 { + return Err(LinearRegressionEstimationError::EmptyDataError); + } + + if data.len() == 1 { + return Err(LinearRegressionEstimationError::NotEnoughDataError); + } - fn build(data: Vec>) -> Result { let (x, y): (Vec, Vec) = data.into_iter().map(|DataPoint { x, y }| (x, y)).unzip(); - let (slope, intercept) = linreg::linear_regression(&x, &y)?; + let (slope, intercept) = linreg::linear_regression(&x, &y) + .map_err(|err| LinearRegressionEstimationError::LinregError(err))?; Ok(Self { slope, intercept }) } @@ -23,6 +33,18 @@ impl<'de> Estimator<'de, f64, f64> for LinearRegressionEstimator { } } +#[derive(Debug, Error)] +pub enum LinearRegressionEstimationError { + #[error("Linear regression error: {0}")] + LinregError(linreg::Error), + + #[error("Data provided is empty")] + EmptyDataError, + + #[error("Data provided is not enough")] + NotEnoughDataError, +} + #[cfg(test)] mod tests { use crate::estimator::{DataPoint, Estimator, LinearRegressionEstimator}; diff --git a/crates/routing-engine/src/estimator/mod.rs b/crates/routing-engine/src/estimator/mod.rs index 2aa2b75..f7a1eaa 100644 --- a/crates/routing-engine/src/estimator/mod.rs +++ b/crates/routing-engine/src/estimator/mod.rs @@ -1,3 +1,4 @@ +use std::error::Error; use std::fmt::Debug; use serde::{Deserialize, Serialize}; @@ -12,8 +13,8 @@ pub struct DataPoint { pub(crate) y: Output, } -pub trait Estimator<'de, Input, Output>: Serialize + Deserialize<'de> { - type Error: Debug; +pub trait Estimator<'de, Input, Output>: Serialize + Deserialize<'de> + Debug { + type Error: Error + Debug; fn build(data: Vec>) -> Result; diff --git a/crates/routing-engine/src/indexer.rs b/crates/routing-engine/src/indexer.rs index 40f6c0c..b30fe23 100644 --- a/crates/routing-engine/src/indexer.rs +++ b/crates/routing-engine/src/indexer.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use std::hash::{DefaultHasher, Hash, Hasher}; -use derive_more::Display; use futures::stream::StreamExt; +use log::{error, info}; +use thiserror::Error; use config::config::BucketConfig; @@ -14,44 +15,43 @@ const BUCKET_PROCESSING_RATE_LIMIT: usize = 5; pub struct Indexer< 'config, Source: source::RouteSource, - ModelStore: storage::RoutingModelStore, + ModelStore: storage::KeyValueStore, Producer: storage::MessageQueue, TokenPriceProvider: token_price::TokenPriceProvider, > { config: &'config config::Config, source: &'config Source, - model_store: &'config mut ModelStore, - message_producer: &'config mut Producer, + model_store: &'config ModelStore, + message_producer: &'config Producer, token_price_provider: &'config TokenPriceProvider, } -const POINTS_COUNT_PER_BUCKET: u8 = 10; - impl< 'config, RouteSource: source::RouteSource, - ModelStore: storage::RoutingModelStore, + ModelStore: storage::KeyValueStore, Producer: storage::MessageQueue, TokenPriceProvider: token_price::TokenPriceProvider, > Indexer<'config, RouteSource, ModelStore, Producer, TokenPriceProvider> { - fn new( + pub fn new( config: &'config config::Config, source: &'config RouteSource, - model_store: &'config mut ModelStore, - message_producer: &'config mut Producer, + model_store: &'config ModelStore, + message_producer: &'config Producer, token_price_provider: &'config TokenPriceProvider, ) -> Self { Indexer { config, source, model_store, message_producer, token_price_provider } } - fn generate_bucket_observation_points(bucket: &BucketConfig) -> Vec { - (0..POINTS_COUNT_PER_BUCKET) + fn generate_bucket_observation_points(&self, bucket: &BucketConfig) -> Vec { + let points_per_bucket = self.config.indexer_config.points_per_bucket; + (0..points_per_bucket) .into_iter() .map(|i| { bucket.token_amount_from_usd + (i as f64) * (bucket.token_amount_to_usd - bucket.token_amount_from_usd) - / (POINTS_COUNT_PER_BUCKET as f64) + / (points_per_bucket as f64) }) .collect() } @@ -60,13 +60,27 @@ impl< &self, bucket: &'config BucketConfig, cost_type: &CostType, - ) -> Result { + ) -> Result> { + let bucket_id = bucket.get_hash(); + info!("Building estimator for bucket: {:?} with ID: {}", bucket, bucket_id); + // Generate Data to "Train" Estimator - let observation_points = Indexer::::generate_bucket_observation_points(bucket); + let observation_points = self.generate_bucket_observation_points(bucket); + let observation_points_len = observation_points.len(); + + info!("BucketID-{}: {} Observation points generated", bucket_id, observation_points.len()); - let data_points = futures::stream::iter(observation_points) - .map(|input_value_in_usd: f64| { + let data_points = futures::stream::iter(observation_points.into_iter().enumerate()) + .map(|(idx, input_value_in_usd)| { async move { + info!( + "BucketID-{}: Building Point {} {}/{}", + bucket_id, + input_value_in_usd, + idx + 1, + observation_points_len + ); + // Convert input_value_in_usd to token_amount_in_wei let from_token_amount_in_wei = token_price::utils::get_token_amount_from_value_in_usd( @@ -74,7 +88,7 @@ impl< self.token_price_provider, &bucket.from_token, bucket.from_chain_id, - input_value_in_usd, + &input_value_in_usd, ) .await .map_err(|err| IndexerErrors::TokenPriceProviderError(err))?; @@ -88,9 +102,23 @@ impl< .await .map_err(|err| IndexerErrors::RouteSourceError(err))?; + info!( + "BucketID-{}: Point {} {}/{} Built", + bucket_id, + input_value_in_usd, + idx + 1, + observation_points_len + ); + Ok::< estimator::DataPoint, - IndexerErrors, + IndexerErrors< + TokenPriceProvider, + RouteSource, + ModelStore, + Producer, + Estimator, + >, >(estimator::DataPoint { x: input_value_in_usd, y: fee_in_usd, @@ -101,20 +129,41 @@ impl< .collect::, - IndexerErrors, + IndexerErrors, >, >>() - .await - .into_iter() - .filter(|r| r.is_ok()) - .map(|r| match r { - Result::Ok(data_point) => data_point, - _ => unreachable!(), - }) - .collect(); + .await; + + info!( + "BucketID-{}: Points successfully built: {}/{}", + bucket_id, + data_points.len(), + observation_points_len + ); + + let (data_points, failed): (Vec>, Vec>) = + data_points.into_iter().partition(|r| r.is_ok()); + + let data_points: Vec> = + data_points.into_iter().map(|r| r.unwrap()).collect(); + let failed: Vec< + IndexerErrors, + > = failed.into_iter().map(|r| r.unwrap_err()).collect(); + + if failed.len() > 0 { + error!("BucketID-{}: Failed to fetch some data points: {:?}", bucket_id, failed); + } + + if data_points.is_empty() { + error!("BucketID-{}: No data points were built", bucket_id); + return Err(BuildEstimatorError::NoDataPoints(bucket)); + } // Build the Estimator - Estimator::build(data_points) + info!("BucketID-{}:All data points fetched, building estimator...", bucket_id); + let estimator = Estimator::build(data_points) + .map_err(|e| BuildEstimatorError::EstimatorBuildError(bucket, e))?; + Ok(estimator) } async fn publish_estimators< @@ -122,9 +171,14 @@ impl< 'est_de, Estimator: estimator::Estimator<'est_de, f64, f64>, >( - &mut self, + &self, values: Vec<(&&BucketConfig, &Estimator)>, - ) -> Result<(), IndexerErrors> { + ) -> Result< + (), + IndexerErrors<'est_de, TokenPriceProvider, RouteSource, ModelStore, Producer, Estimator>, + > { + info!("Publishing {} estimators", values.len()); + let values_transformed = values .iter() .map(|(k, v)| { @@ -146,31 +200,47 @@ impl< } pub async fn run<'est_de, Estimator: estimator::Estimator<'est_de, f64, f64>>( - &mut self, + &self, ) -> Result< HashMap<&'config BucketConfig, Estimator>, - IndexerErrors, + IndexerErrors<'est_de, TokenPriceProvider, RouteSource, ModelStore, Producer, Estimator>, > { + info!("Running Indexer"); + // Build Estimators + let (estimators, failed_estimators): (Vec<_>, Vec<_>) = futures::stream::iter( + self.config.buckets.iter(), + ) + .map(|bucket: &_| async { + // Build the Estimator + let estimator = self.build_estimator(bucket, &CostType::Fee).await?; + + Ok::<(&BucketConfig, Estimator), BuildEstimatorError<'config, 'est_de, Estimator>>(( + bucket, estimator, + )) + }) + .buffer_unordered(BUCKET_PROCESSING_RATE_LIMIT) + .collect::>() + .await + .into_iter() + .partition(|r| r.is_ok()); + let estimator_map: HashMap<&BucketConfig, Estimator> = - futures::stream::iter(self.config.buckets.iter()) - .map(|bucket| async { - // Build the Estimator - let estimator: Estimator = self.build_estimator(bucket, &CostType::Fee).await?; - - Ok::<(&BucketConfig, Estimator), Estimator::Error>((bucket, estimator)) - }) - .buffer_unordered(BUCKET_PROCESSING_RATE_LIMIT) - .collect::>() - .await - .into_iter() - .filter(|r| r.is_ok()) - .map(|r| r.unwrap()) - .collect(); + estimators.into_iter().map(|r| r.unwrap()).collect(); + + if !failed_estimators.is_empty() { + error!("Failed to build some estimators: {:?}", failed_estimators); + } + + if estimator_map.is_empty() { + error!("No estimators built"); + return Err(IndexerErrors::NoEstimatorsBuilt); + } self.publish_estimators(estimator_map.iter().collect()).await?; // Broadcast a Message to other nodes to update their cache + info!("Broadcasting Indexer Update Message"); self.message_producer .publish( &self.config.indexer_config.indexer_update_topic, @@ -183,80 +253,104 @@ impl< } } -#[derive(Debug, Display)] -enum IndexerErrors< +#[derive(Debug, Error)] +pub enum IndexerErrors< + 'a, T: token_price::TokenPriceProvider, S: source::RouteSource, - R: storage::RoutingModelStore, + R: storage::KeyValueStore, U: storage::MessageQueue, + V: estimator::Estimator<'a, f64, f64>, > { - #[display("Route build error: {}", _0)] + #[error("Route build error: {}", _0)] RouteBuildError(RouteError), - #[display("Token price provider error: {}", _0)] + #[error("Token price provider error: {}", _0)] TokenPriceProviderError(token_price::utils::Errors), - #[display("Route source error: {}", _0)] + #[error("Route source error: {}", _0)] RouteSourceError(S::FetchRouteCostError), - #[display("Publish estimator error: {}", _0)] + #[error("Publish estimator error: {}", _0)] PublishEstimatorError(R::Error), - #[display("Publish estimator errors: {:?}", _0)] + #[error("Publish estimator errors: {:?}", _0)] PublishEstimatorErrors(Vec), - #[display("Indexer update message error: {}", _0)] + #[error("Indexer update message error: {}", _0)] PublishIndexerUpdateMessageError(U::Error), + + #[error("Estimator build error: {}", _0)] + EstimatorBuildError(V::Error), + + #[error("No estimators built")] + NoEstimatorsBuilt, +} + +#[derive(Debug, Error)] +pub enum BuildEstimatorError<'config, 'est_de, Estimator: estimator::Estimator<'est_de, f64, f64>> { + #[error("No data points found while building estimator for {:?}", _0)] + NoDataPoints(&'config BucketConfig), + + #[error("Estimator build error: {} for bucket {:?}", _1, _0)] + EstimatorBuildError(&'config BucketConfig, Estimator::Error), } #[cfg(test)] mod tests { use std::env; use std::fmt::Error; + use std::time::Duration; - use config::Config; - use storage::{ControlFlow, MessageQueue, Msg, RoutingModelStore}; + use derive_more::Display; + use thiserror::Error; - use crate::CostType; + use config::{Config, get_sample_config}; + use storage::{ControlFlow, KeyValueStore, MessageQueue, Msg}; + + use crate::{BungeeClient, CostType}; use crate::estimator::{Estimator, LinearRegressionEstimator}; use crate::indexer::Indexer; - use crate::source::BungeeClient; use crate::token_price::TokenPriceProvider; + #[derive(Error, Display, Debug)] + struct Err; + #[derive(Debug)] struct ModelStoreStub; - impl RoutingModelStore for ModelStoreStub { - type Error = (); + impl KeyValueStore for ModelStoreStub { + type Error = Err; - async fn get(&mut self, k: &String) -> Result { + async fn get(&self, _: &String) -> Result { Ok("Get".to_string()) } - async fn get_multiple(&mut self, k: &Vec) -> Result, Self::Error> { + async fn get_multiple(&self, k: &Vec) -> Result, Self::Error> { Ok(vec!["Get".to_string(); k.len()]) } - async fn set(&mut self, k: &String, v: &String) -> Result<(), Self::Error> { + async fn set(&self, _: &String, _: &String, _: Duration) -> Result<(), Self::Error> { Ok(()) } - async fn set_multiple(&mut self, kv: &Vec<(String, String)>) -> Result<(), Self::Error> { + async fn set_multiple(&self, _: &Vec<(String, String)>) -> Result<(), Self::Error> { Ok(()) } } + #[derive(Debug)] struct ProducerStub; impl MessageQueue for ProducerStub { - type Error = (); + type Error = Err; - async fn publish(&mut self, topic: &str, message: &str) -> Result<(), ()> { + async fn publish(&self, _: &str, _: &str) -> Result<(), Self::Error> { Ok(()) } fn subscribe( - &mut self, - topic: &str, - callback: impl FnMut(Msg) -> ControlFlow, + &self, + _: &str, + _: impl FnMut(Msg) -> ControlFlow, ) -> Result<(), Self::Error> { Ok(()) } @@ -267,83 +361,38 @@ mod tests { impl TokenPriceProvider for TokenPriceProviderStub { type Error = Error; - async fn get_token_price(&self, token_symbol: &String) -> Result { + async fn get_token_price(&self, _token_symbol: &String) -> Result { Ok(1.0) // USDC } } fn setup<'a>() -> (Config, BungeeClient, ModelStoreStub, ProducerStub, TokenPriceProviderStub) { - // let config = config::Config::from_file("../../config.yaml").unwrap(); - let mut config = config::Config::from_yaml_str( - r#" -chains: - - id: 1 - name: Ethereum - is_enabled: true - - id: 42161 - name: Arbitrum - is_enabled: true -tokens: - - symbol: USDC - is_enabled: true - by_chain: - 1: - is_enabled: true - decimals: 6 - address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' - 42161: - is_enabled: true - decimals: 6 - address: '0xaf88d065e77c8cC2239327C5EDb3A432268e5831' -buckets: - - from_chain_id: 1 - to_chain_id: 42161 - from_token: USDC - to_token: USDC - is_smart_contract_deposit_supported: false - token_amount_from_usd: 1 - token_amount_to_usd: 10 - - from_chain_id: 1 - to_chain_id: 42161 - from_token: USDC - to_token: USDC - is_smart_contract_deposit_supported: false - token_amount_from_usd: 10 - token_amount_to_usd: 100 - - from_chain_id: 1 - to_chain_id: 42161 - from_token: USDC - to_token: USDC - is_smart_contract_deposit_supported: false - token_amount_from_usd: 100 - token_amount_to_usd: 1000 -bungee: - base_url: https://api.socket.tech/v2 - api_key: -covalent: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' -coingecko: - base_url: 'https://api.coingecko.com' - api_key: 'my-api' -infra: - redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' - mongo_url: 'mongodb://localhost:27017' -server: - port: 8080 - host: 'localhost' -indexer_config: - is_indexer: true - indexer_update_topic: indexer_update - indexer_update_message: message - "#, - ) - .unwrap(); + let mut config = get_sample_config(); + config.buckets = vec![ + config::BucketConfig { + from_chain_id: 1, + to_chain_id: 42161, + from_token: "USDC".to_string(), + to_token: "USDC".to_string(), + is_smart_contract_deposit_supported: false, + token_amount_from_usd: 10.0, + token_amount_to_usd: 100.0, + }, + config::BucketConfig { + from_chain_id: 1, + to_chain_id: 42161, + from_token: "USDC".to_string(), + to_token: "USDC".to_string(), + is_smart_contract_deposit_supported: false, + token_amount_from_usd: 100.0, + token_amount_to_usd: 1000.0, + }, + ]; config.bungee.api_key = env::var("BUNGEE_API_KEY").unwrap(); - let bungee_client = BungeeClient::new(&config.bungee).unwrap(); + let bungee_client = + BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key).unwrap(); let model_store = ModelStoreStub; let message_producer = ProducerStub; let token_price_provider = TokenPriceProviderStub; @@ -353,14 +402,19 @@ indexer_config: #[tokio::test] async fn test_build_estimator() { - let (config, bungee_client, mut model_store, mut message_producer, token_price_provider) = - setup(); + let ( + config, + bungee_client, + mut model_store, + mut message_producer, + mut token_price_provider, + ) = setup(); let indexer = Indexer::new( &config, &bungee_client, &mut model_store, &mut message_producer, - &token_price_provider, + &mut token_price_provider, ); let estimator = indexer.build_estimator(&config.buckets[0], &CostType::Fee).await; diff --git a/crates/routing-engine/src/lib.rs b/crates/routing-engine/src/lib.rs index e6c7094..91fe8a2 100644 --- a/crates/routing-engine/src/lib.rs +++ b/crates/routing-engine/src/lib.rs @@ -1,28 +1,29 @@ -use derive_more::{Display, From}; +use derive_more::Display; use thiserror::Error; use config::config::{BucketConfig, ChainConfig, Config, TokenConfig}; pub use indexer::Indexer; +pub use source::bungee::BungeeClient; +pub use token_price::CoingeckoClient; -// use route_fee_bucket::RouteFeeBucket; pub mod engine; pub mod route_fee_bucket; -mod traits; - #[cfg(test)] mod tests; pub mod token_price; +mod traits; pub mod estimator; pub mod indexer; mod source; #[derive(Debug, Error, Display)] -enum CostType { +pub enum CostType { Fee, - BridgingTime, + // BridgingTime, } +#[derive(Debug)] pub struct Route<'a> { from_chain: &'a ChainConfig, to_chain: &'a ChainConfig, @@ -63,11 +64,11 @@ impl<'a> Route<'a> { } } -#[derive(Debug, Display, From)] -enum RouteError { - #[display("Chain not found while building route: {}", _0)] +#[derive(Debug, Error)] +pub enum RouteError { + #[error("Chain not found while building route: {}", _0)] ChainNotFoundError(u32), - #[display("Token not found while building route: {}", _0)] + #[error("Token not found while building route: {}", _0)] TokenNotFoundError(String), } diff --git a/crates/routing-engine/src/source/bungee/mod.rs b/crates/routing-engine/src/source/bungee/mod.rs index 6690a6b..339fced 100644 --- a/crates/routing-engine/src/source/bungee/mod.rs +++ b/crates/routing-engine/src/source/bungee/mod.rs @@ -1,9 +1,10 @@ +use derive_more::Display; +use log::{error, info}; use reqwest; use reqwest::header; use ruint::aliases::U256; use thiserror::Error; -use config::config::BungeeConfig; use types::*; use crate::{CostType, Route}; @@ -18,8 +19,9 @@ pub struct BungeeClient { } impl BungeeClient { - pub(crate) fn new( - BungeeConfig { base_url, api_key }: &BungeeConfig, + pub fn new<'config>( + base_url: &'config String, + api_key: &'config String, ) -> Result { let mut headers = header::HeaderMap::new(); headers.insert("API-KEY", header::HeaderValue::from_str(api_key)?); @@ -34,6 +36,8 @@ impl BungeeClient { &self, params: GetQuoteRequest, ) -> Result, BungeeClientError> { + info!("Fetching quote from bungee for {:?}", params); + let response = self.client.get(self.base_url.to_owned() + "/quote").query(¶ms).send().await?; let raw_text = response.text().await?; @@ -63,9 +67,14 @@ pub enum BungeeFetchRouteCostError { EstimationTypeNotImplementedError(#[from] CostType), } +#[derive(Error, Debug, Display)] +pub struct GenerateRouteCalldataError; + impl RouteSource for BungeeClient { type FetchRouteCostError = BungeeFetchRouteCostError; - type GenerateRouteCalldataError = (); + + // todo + type GenerateRouteCalldataError = GenerateRouteCalldataError; async fn fetch_least_route_cost_in_usd( &self, @@ -73,9 +82,12 @@ impl RouteSource for BungeeClient { from_token_amount: U256, estimation_type: &CostType, ) -> Result { + info!("Fetching least route cost in USD for route {:?} with token amount {} and estimation type {}", route, from_token_amount, estimation_type); + // Build GetQuoteRequest let from_token = route.from_token.by_chain.get(&route.from_chain.id); if from_token.is_none() { + error!("Missing chain for token {} in config", route.from_token.symbol); return Err(BungeeFetchRouteCostError::MissingChainForTokenInConfigError( route.from_chain.id, route.from_token.symbol.clone(), @@ -86,6 +98,7 @@ impl RouteSource for BungeeClient { let to_token = route.to_token.by_chain.get(&route.to_chain.id); if let None = to_token { + error!("Missing chain for token {} in config", route.to_token.symbol); return Err(BungeeFetchRouteCostError::MissingChainForTokenInConfigError( route.to_chain.id, route.to_token.symbol.clone(), @@ -121,22 +134,24 @@ impl RouteSource for BungeeClient { route.total_gas_fees_in_usd + route.input_value_in_usd? - route.output_value_in_usd?, ), - _ => None, }) .filter(|cost| cost.is_some()) .map(|cost| cost.unwrap()) .collect(); if route_costs_in_usd.len() == 0 { + error!("No valid routes returned by Bungee API for route {:?}", route); return Err(BungeeFetchRouteCostError::NoValidRouteError()); } + info!("Route costs in USD: {:?} for route {:?}", route_costs_in_usd, route); + Ok(route_costs_in_usd.into_iter().min_by(|a, b| a.total_cmp(b)).unwrap()) } async fn generate_route_calldata( &self, - route: &Route<'_>, + _: &Route<'_>, ) -> Result { todo!() } @@ -149,69 +164,21 @@ mod tests { use ruint::Uint; use config::Config; + use config::get_sample_config; - use crate::{CostType, Route}; - use crate::source::{BungeeClient, RouteSource}; + use crate::{BungeeClient, CostType, Route}; use crate::source::bungee::types::GetQuoteRequest; + use crate::source::RouteSource; fn setup() -> (Config, BungeeClient) { - // let config = config::Config::from_file("../../config.yaml").unwrap(); - let mut config = config::Config::from_yaml_str( - r#" -chains: - - id: 1 - name: Ethereum - is_enabled: true - - id: 42161 - name: Arbitrum - is_enabled: true -tokens: - - symbol: USDC - is_enabled: true - by_chain: - 1: - is_enabled: true - decimals: 6 - address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' - 42161: - is_enabled: true - decimals: 6 - address: '0xaf88d065e77c8cC2239327C5EDb3A432268e5831' -buckets: - - from_chain_id: 1 - to_chain_id: 42161 - from_token: USDC - to_token: USDC - is_smart_contract_deposit_supported: false - token_amount_from_usd: 1 - token_amount_to_usd: 10 -bungee: - base_url: https://api.socket.tech/v2 - api_key: -covalent: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' -coingecko: - base_url: 'https://api.coingecko.com' - api_key: 'my-api' -infra: - redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' - mongo_url: 'mongodb://localhost:27017' -server: - port: 8080 - host: 'localhost' -indexer_config: - is_indexer: true - indexer_update_topic: indexer_update - indexer_update_message: message - "#, + let config = get_sample_config(); + + let bungee_client = BungeeClient::new( + &"https://api.socket.tech/v2".to_string(), + &env::var("BUNGEE_API_KEY").unwrap().to_string(), ) .unwrap(); - config.bungee.api_key = env::var("BUNGEE_API_KEY").unwrap(); - - let bungee_client = BungeeClient::new(&config.bungee).unwrap(); return (config, bungee_client); } diff --git a/crates/routing-engine/src/source/mod.rs b/crates/routing-engine/src/source/mod.rs index eaf990b..0df6090 100644 --- a/crates/routing-engine/src/source/mod.rs +++ b/crates/routing-engine/src/source/mod.rs @@ -1,28 +1,27 @@ +use std::error::Error; use std::fmt::Debug; use ruint::aliases::U256; -pub use bungee::BungeeClient; - use crate::{CostType, Route}; -mod bungee; +pub mod bungee; type Calldata = String; -pub(crate) trait RouteSource: Debug { - type FetchRouteCostError: Debug; - type GenerateRouteCalldataError: Debug; +pub trait RouteSource: Debug { + type FetchRouteCostError: Debug + Error; + type GenerateRouteCalldataError: Debug + Error; - async fn fetch_least_route_cost_in_usd( + fn fetch_least_route_cost_in_usd( &self, route: &Route, from_token_amount: U256, estimation_type: &CostType, - ) -> Result; + ) -> impl futures::Future>; - async fn generate_route_calldata( + fn generate_route_calldata( &self, route: &Route, - ) -> Result; + ) -> impl futures::Future>; } diff --git a/crates/routing-engine/src/tests.rs b/crates/routing-engine/src/tests.rs index 0fe2c3d..710cd28 100644 --- a/crates/routing-engine/src/tests.rs +++ b/crates/routing-engine/src/tests.rs @@ -1,16 +1,15 @@ use async_trait::async_trait; -use mockall::predicate::*; #[cfg(test)] mod tests { - use super::*; - use crate::{ - engine::{Route, RoutingEngine}, - traits::{AccountAggregation, RouteFee}, - }; - use account_aggregation::types::Balance; use mockall::mock; + use account_aggregation::types::Balance; + + use crate::traits::{AccountAggregation, RouteFee}; + + use super::*; + mock! { pub AccountAggregationService {} diff --git a/crates/routing-engine/src/token_price/coingecko.rs b/crates/routing-engine/src/token_price/coingecko.rs new file mode 100644 index 0000000..23f4779 --- /dev/null +++ b/crates/routing-engine/src/token_price/coingecko.rs @@ -0,0 +1,237 @@ +use std::fmt::Debug; +use std::num::ParseFloatError; +use std::time::Duration; + +use derive_more::Display; +use log::{error, info}; +use reqwest::{header, StatusCode}; +use serde::Deserialize; +use thiserror::Error; + +use storage::KeyValueStore; + +use crate::token_price::coingecko::CoingeckoClientError::RequestFailed; +use crate::token_price::TokenPriceProvider; + +#[derive(Debug)] +pub struct CoingeckoClient<'config, KVStore: KeyValueStore> { + base_url: &'config String, + client: reqwest::Client, + cache: &'config KVStore, + key_expiry: Duration, +} + +impl<'config, KVStore: KeyValueStore> CoingeckoClient<'config, KVStore> { + pub fn new( + base_url: &'config String, + api_key: &'config String, + cache: &'config KVStore, + key_expiry: Duration, + ) -> CoingeckoClient<'config, KVStore> { + let mut headers = header::HeaderMap::new(); + headers.insert( + "x-cg-pro-api-key", + header::HeaderValue::from_str(api_key) + .expect("Error while building header value Invalid CoinGecko API Key"), + ); + + let client = reqwest::Client::builder() + .default_headers(headers) + .build() + .expect("Failed to build reqwest client for Coingecko Client"); + + CoingeckoClient { base_url, client, cache, key_expiry } + } + + async fn get_fresh_token_price( + &self, + token_symbol: &String, + ) -> Result> { + info!("Fetching fresh token price for {}", token_symbol); + + let response = + self.client.get(format!("{}/coins/{}", self.base_url, token_symbol)).send().await?; + + if response.status() != StatusCode::OK { + error!("CoinGecko /coins/ Request failed with status: {}", response.status()); + return Err(RequestFailed(response.status())); + } + + let raw_text = response.text().await?; + + let response: CoinsIdResponse = serde_json::from_str(&raw_text) + .map_err(|err| CoingeckoClientError::DeserialisationError(raw_text, err))?; + + let result = response.market_data.current_price.usd; + + info!("Token price fetched from API for token {}: {}", token_symbol, result); + + Ok(result) + } +} + +impl<'config, KVStore: KeyValueStore> TokenPriceProvider for CoingeckoClient<'config, KVStore> { + type Error = CoingeckoClientError; + + async fn get_token_price(&self, token_symbol: &String) -> Result { + info!("Fetching token price for {}", token_symbol); + + let key = format!("{}_price", token_symbol); + match self.cache.get(&key).await { + Ok(result) => { + info!("Token price fetched from cache"); + + let price: f64 = result.parse()?; + if price.is_nan() { + Err(Self::Error::InvalidPriceReturnedFromCacheResult(result.clone()))?; + } + Ok(price) + } + Err(_) => { + info!("Token price not found in cache"); + + let price = self.get_fresh_token_price(token_symbol).await?; + self.cache + .set(&key, &price.to_string(), self.key_expiry) + .await + .map_err(CoingeckoClientError::UpdateTokenCacheError)?; + Ok(price) + } + } + } +} + +#[derive(Debug, Error, Display)] +pub enum CoingeckoClientError { + UpdateTokenCacheError(KVStore::Error), + + InvalidPriceReturnedFromCacheResult(String), + + InvalidPriceReturnedFromCache(#[from] ParseFloatError), + + #[display("Deserialization Error - Original String {}, Error {}", _0, _1)] + DeserialisationError(String, serde_json::Error), + + RequestFailed(StatusCode), + + ApiCallError(#[from] reqwest::Error), +} + +#[derive(Debug, Deserialize)] +struct CoinsIdResponse { + market_data: CoinsIdResponseMarketData, +} + +#[derive(Debug, Deserialize)] +struct CoinsIdResponseMarketData { + current_price: CoinsIdResponseMarketDataCurrentPrice, +} + +#[derive(Debug, Deserialize)] +struct CoinsIdResponseMarketDataCurrentPrice { + usd: f64, +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use std::collections::HashMap; + use std::env; + use std::fmt::Debug; + use std::time::Duration; + + use derive_more::Display; + use thiserror::Error; + + use config::{Config, get_sample_config}; + use storage::KeyValueStore; + + use crate::CoingeckoClient; + use crate::token_price::TokenPriceProvider; + + #[derive(Error, Debug, Display)] + struct Err; + + #[derive(Default, Debug)] + struct KVStore { + map: RefCell>, + } + + impl KeyValueStore for KVStore { + type Error = Err; + + async fn get(&self, k: &String) -> Result { + match self.map.borrow().get(k) { + Some(v) => Ok(v.clone()), + None => Result::Err(Err), + } + } + + async fn get_multiple(&self, _: &Vec) -> Result, Self::Error> { + todo!() + } + + async fn set(&self, k: &String, v: &String, _: Duration) -> Result<(), Self::Error> { + self.map + .borrow_mut() + .insert((*k.clone()).parse().unwrap(), (*v.clone()).parse().unwrap()); + Ok(()) + } + + async fn set_multiple(&self, _: &Vec<(String, String)>) -> Result<(), Self::Error> { + todo!() + } + } + + fn setup_config<'a>() -> Config { + get_sample_config() + } + + #[tokio::test] + async fn test_should_fetch_fresh_api_price() { + let config = setup_config(); + + let api_key = env::var("COINGECKO_API_KEY").unwrap(); + + let store = KVStore::default(); + + let client = CoingeckoClient::new( + &config.coingecko.base_url, + &api_key, + &store, + Duration::from_secs(config.coingecko.expiry_sec), + ); + let price = client.get_fresh_token_price(&"usd-coin".to_string()).await.unwrap(); + + assert!(price > 0.0); + } + + #[tokio::test] + async fn test_should_cache_api_prices() { + let config = setup_config(); + + let api_key = env::var("COINGECKO_API_KEY").unwrap(); + + let store = KVStore::default(); + + let client = CoingeckoClient::new( + &config.coingecko.base_url, + &api_key, + &store, + Duration::from_secs(config.coingecko.expiry_sec), + ); + let price = client.get_token_price(&"usd-coin".to_string()).await.unwrap(); + + assert!(price > 0.0); + let key = "usd-coin_price".to_string(); + assert_eq!(store.get(&key).await.unwrap().parse::().unwrap(), price); + + let price2 = client.get_token_price(&"usd-coin".to_string()).await.unwrap(); + assert_eq!(price, price2); + + store.set(&key, &"1.1".to_string(), Duration::from_secs(10)).await.unwrap(); + + let price = client.get_token_price(&"usd-coin".to_string()).await.unwrap(); + assert_eq!(price, 1.1); + } +} diff --git a/crates/routing-engine/src/token_price/mod.rs b/crates/routing-engine/src/token_price/mod.rs index 12e32a3..cf83abe 100644 --- a/crates/routing-engine/src/token_price/mod.rs +++ b/crates/routing-engine/src/token_price/mod.rs @@ -1,10 +1,16 @@ use std::error::Error; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; +pub use coingecko::CoingeckoClient; + +mod coingecko; pub mod utils; pub trait TokenPriceProvider: Debug { - type Error: Error + Display + Debug; + type Error: Error + Debug; - async fn get_token_price(&self, token_symbol: &String) -> Result; + fn get_token_price( + &self, + token_symbol: &String, + ) -> impl futures::Future>; } diff --git a/crates/routing-engine/src/token_price/utils.rs b/crates/routing-engine/src/token_price/utils.rs index f6f7bc6..8deb9db 100644 --- a/crates/routing-engine/src/token_price/utils.rs +++ b/crates/routing-engine/src/token_price/utils.rs @@ -1,4 +1,4 @@ -use derive_more::{Display, From}; +use derive_more::Display; use ruint; use ruint::aliases::U256; use ruint::Uint; @@ -11,19 +11,19 @@ pub async fn get_token_amount_from_value_in_usd<'config, T: TokenPriceProvider>( token_price_provider: &'config T, token_symbol: &'config String, chain_id: u32, - value_in_usd: f64, + value_in_usd: &f64, ) -> Result> { - let token_price = token_price_provider - .get_token_price(token_symbol) - .await - .map_err(Errors::::TokenPriceProviderError)?; - let token_config = config.tokens.get(token_symbol); if token_config.is_none() { return Err(Errors::TokenConfigurationNotFound(token_symbol.clone())); } let token_config = token_config.unwrap(); + let token_price = token_price_provider + .get_token_price(&token_config.coingecko_symbol) + .await + .map_err(Errors::::TokenPriceProviderError)?; + let token_config_by_chain = token_config.by_chain.get(&chain_id); if token_config_by_chain.is_none() { return Err(Errors::TokenConfigurationNotFoundForChain(token_symbol.clone(), chain_id)); @@ -39,7 +39,7 @@ pub async fn get_token_amount_from_value_in_usd<'config, T: TokenPriceProvider>( } #[derive(Debug, Error)] -pub(crate) enum Errors { +pub enum Errors { #[error("Token price provider error: {}", _0)] TokenPriceProviderError(#[from] T), @@ -56,49 +56,12 @@ mod tests { use ruint::Uint; - use config::Config; + use config::{Config, get_sample_config}; use crate::token_price::TokenPriceProvider; fn setup() -> Config { - config::Config::from_yaml_str( - r#" -chains: - - id: 1 - name: Ethereum - is_enabled: true -tokens: - - symbol: USDC - is_enabled: true - by_chain: - 1: - is_enabled: true - decimals: 6 - address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' -buckets: -bungee: - base_url: https://api.socket.tech/v2 - api_key: 72a5b4b0-e727-48be-8aa1-5da9d62fe635 -covalent: - base_url: 'https://api.bungee.exchange' - api_key: 'my-api' -coingecko: - base_url: 'https://api.coingecko.com' - api_key: 'my-api' -infra: - redis_url: 'redis://localhost:6379' - rabbitmq_url: 'amqp://localhost:5672' - mongo_url: 'mongodb://localhost:27017' -server: - port: 8080 - host: 'localhost' -indexer_config: - is_indexer: true - indexer_update_topic: indexer_update - indexer_update_message: message - "#, - ) - .unwrap() + get_sample_config() } #[derive(Debug)] @@ -115,7 +78,7 @@ indexer_config: #[tokio::test] async fn test_get_token_amount_from_value_in_usd() { let config = setup(); - let token_price_provider = TokenPriceProviderStub; + let mut token_price_provider = TokenPriceProviderStub; let token_symbol = String::from("USDC"); let chain_id = 1; @@ -123,10 +86,10 @@ indexer_config: let result = super::get_token_amount_from_value_in_usd( &config, - &token_price_provider, + &mut token_price_provider, &token_symbol, chain_id, - value_in_usd, + &value_in_usd, ) .await; diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 5c6e120..ee2fa9b 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -16,6 +16,7 @@ redis = { version = "0.25.4", features = ["aio", "tokio-comp"] } # workspace dependencies config = { workspace = true } +log = "0.4.21" [dev-dependencies] serial_test = "3.1.1" diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 7e208ab..5c0e3e2 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,32 +1,52 @@ +use std::error::Error; use std::fmt::Debug; +use std::future; +use std::time::Duration; pub use ::redis::{ControlFlow, Msg}; +pub use redis::RedisClient; + pub mod db_provider; pub mod errors; pub mod mongodb_provider; mod redis; -pub trait RoutingModelStore { - type Error: Debug; +pub trait KeyValueStore: Debug { + type Error: Error + Debug; - async fn get(&mut self, k: &String) -> Result; + fn get(&self, k: &String) -> impl future::Future>; - async fn get_multiple(&mut self, k: &Vec) -> Result, Self::Error>; + fn get_multiple( + &self, + k: &Vec, + ) -> impl future::Future, Self::Error>>; - async fn set(&mut self, k: &String, v: &String) -> Result<(), Self::Error>; + fn set( + &self, + k: &String, + v: &String, + expiry: Duration, + ) -> impl future::Future>; - async fn set_multiple(&mut self, kv: &Vec<(String, String)>) -> Result<(), Self::Error>; + fn set_multiple( + &self, + kv: &Vec<(String, String)>, + ) -> impl future::Future>; } -pub trait MessageQueue { - type Error: Debug; +pub trait MessageQueue: Debug { + type Error: Error + Debug; - async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error>; + fn publish( + &self, + topic: &str, + message: &str, + ) -> impl future::Future>; fn subscribe( - &mut self, + &self, topic: &str, callback: impl FnMut(Msg) -> ControlFlow, ) -> Result<(), Self::Error>; diff --git a/crates/storage/src/redis.rs b/crates/storage/src/redis.rs index 37f6bb6..8532712 100644 --- a/crates/storage/src/redis.rs +++ b/crates/storage/src/redis.rs @@ -1,13 +1,15 @@ +use std::time::Duration; + +use log::info; use redis; +use redis::{aio, AsyncCommands, ControlFlow, Msg, PubSubCommands}; use redis::RedisError; -use redis::{aio, AsyncCommands, Commands, ControlFlow, Msg, PubSubCommands}; use thiserror::Error; -use config; - -use crate::{MessageQueue, RoutingModelStore}; +use crate::{KeyValueStore, MessageQueue}; -struct RedisClient { +#[derive(Debug)] +pub struct RedisClient { client: redis::Client, connection: aio::MultiplexedConnection, } @@ -20,38 +22,53 @@ impl RedisClient { } } -impl RoutingModelStore for RedisClient { +impl KeyValueStore for RedisClient { type Error = RedisClientError; - async fn get(&mut self, k: &String) -> Result { - self.connection.get(k).await.map_err(RedisClientError::RedisLibraryError) + // Todo: This should return an option + async fn get(&self, k: &String) -> Result { + info!("Getting key: {}", k); + self.connection.clone().get(k).await.map_err(RedisClientError::RedisLibraryError) } - async fn get_multiple(&mut self, k: &Vec) -> Result, Self::Error> { - self.connection.mget(k).await.map_err(RedisClientError::RedisLibraryError) + async fn get_multiple(&self, k: &Vec) -> Result, Self::Error> { + info!("Getting keys: {:?}", k); + self.connection.clone().mget(k).await.map_err(RedisClientError::RedisLibraryError) } - async fn set(&mut self, k: &String, v: &String) -> Result<(), Self::Error> { - self.connection.set(k, v).await.map_err(RedisClientError::RedisLibraryError) + async fn set(&self, k: &String, v: &String, duration: Duration) -> Result<(), Self::Error> { + info!("Setting key: {} with value: {} and expiry: {}s", k, v, duration.as_secs()); + self.connection + .clone() + .set_ex(k, v, duration.as_secs()) + .await + .map_err(RedisClientError::RedisLibraryError) } - async fn set_multiple(&mut self, kv: &Vec<(String, String)>) -> Result<(), Self::Error> { - self.connection.mset(kv).await.map_err(RedisClientError::RedisLibraryError) + async fn set_multiple(&self, kv: &Vec<(String, String)>) -> Result<(), Self::Error> { + info!("Setting keys: {:?}", kv); + self.connection.clone().mset(kv).await.map_err(RedisClientError::RedisLibraryError) } } impl MessageQueue for RedisClient { type Error = RedisClientError; - async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error> { - self.connection.publish(topic, message).await.map_err(RedisClientError::RedisLibraryError) + async fn publish(&self, topic: &str, message: &str) -> Result<(), Self::Error> { + info!("Publishing to topic: {} with message: {}", topic, message); + self.connection + .clone() + .publish(topic, message) + .await + .map_err(RedisClientError::RedisLibraryError) } fn subscribe( - &mut self, + &self, topic: &str, callback: impl FnMut(Msg) -> ControlFlow, ) -> Result<(), Self::Error> { + info!("Subscribing to topic: {}", topic); let mut connection = self.client.get_connection()?; connection.subscribe(topic, callback)?; Ok(()) @@ -62,6 +79,9 @@ impl MessageQueue for RedisClient { pub enum RedisClientError { #[error("Error thrown from Redis Library: {0}")] RedisLibraryError(#[from] RedisError), + + #[error("Redis Mutex poisoned")] + MutexPoisonedError, } #[cfg(test)] @@ -79,24 +99,24 @@ mod tests { #[tokio::test] async fn test_key_store() { - let mut client = setup().await; + let client = setup().await; let keys = vec!["test_key1".to_string(), "test_key2".to_string()]; let values = vec!["test_value1".to_string(), "test_value2".to_string()]; // Clear - client.set(&keys[0], &String::from("")).await.unwrap(); + client.set(&keys[0], &String::from(""), Duration::from_secs(10)).await.unwrap(); // Test set - client.set(&keys[0], &values[0]).await.unwrap(); + client.set(&keys[0], &values[0], Duration::from_secs(10)).await.unwrap(); // Test get let value = client.get(&keys[0]).await.unwrap(); assert_eq!(value, values[0]); // Clear - client.set(&keys[0], &String::from("")).await.unwrap(); - client.set(&keys[1], &String::from("")).await.unwrap(); + client.set(&keys[0], &String::from(""), Duration::from_secs(10)).await.unwrap(); + client.set(&keys[1], &String::from(""), Duration::from_secs(10)).await.unwrap(); // Multi Set client @@ -113,8 +133,8 @@ mod tests { #[tokio::test] async fn test_pub_sub() { - let (tx, mut rx) = channel::(); - let mut client = setup().await; + let (tx, rx) = channel::(); + let client = setup().await; tokio::task::spawn_blocking(move || { client @@ -127,7 +147,9 @@ mod tests { .unwrap(); }); - let mut client = setup().await; + tokio::time::sleep(Duration::from_secs(5)).await; + + let client = setup().await; client.publish("TOPIC", "HELLO").await.unwrap(); loop { diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ca61c49 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,47 @@ +services: + redis: + image: redis:latest + container_name: redis + ports: + - "6379:6379" + volumes: + - redis_data:/data + + mongodb: + image: mongo:latest + container_name: mongodb + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + + reflux-solver: + build: + context: . + dockerfile: Dockerfile + container_name: reflux-solver + entrypoint: [ "/app/reflux", "--solver", "--config", "/app/ctx/config.docker.yaml" ] + ports: + - "8000:8080" + depends_on: + - redis + - mongodb + volumes: + - .:/app/ctx + + reflux-indexer: + build: + context: . + dockerfile: Dockerfile + container_name: reflux-indexer + entrypoint: [ "/app/reflux", "--indexer", "--config", "/app/ctx/config.docker.yaml" ] + depends_on: + - redis + - mongodb + volumes: + - .:/app/ctx + +volumes: + redis_data: + mongo_data: +