Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: decode cached data for engine #6

Merged
merged 55 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a3db8e4
feat: Implement a configuration parser
ankurdubey521 Jun 11, 2024
e91fc55
test: Add Tests for Uniqueness
ankurdubey521 Jun 11, 2024
daf5530
Create pull_request_template.md
ankurdubey521 Jun 11, 2024
373fd92
chore: Add Sample config.yaml
ankurdubey521 Jun 11, 2024
544f53c
Merge branch 'main' of github.com:bcnmy/reflux
ankurdubey521 Jun 12, 2024
49c1686
Merge branch 'dev' into feat/config
ankurdubey521 Jun 12, 2024
ef5d682
feat: init bin and api crate
AmanRaj1608 Jun 12, 2024
dd62c49
Merge remote-tracking branch 'origin/feat/config' into feat/bin-refactor
AmanRaj1608 Jun 12, 2024
2fad08e
Merge remote-tracking branch 'origin/feat/config' into feat/bin-refactor
AmanRaj1608 Jun 12, 2024
1c15b34
Merge remote-tracking branch 'origin/dev' into feat/bin-refactor
AmanRaj1608 Jun 12, 2024
c18c3e8
feat: LinearRegressionEstimator
ankurdubey521 Jun 12, 2024
e3617a5
feat: storage and api updates
AmanRaj1608 Jun 13, 2024
77df202
chore: Refactor mongodb and user apis
AmanRaj1608 Jun 13, 2024
68a9040
feat: RouteSource Implementation for BungeeClient
ankurdubey521 Jun 13, 2024
f23cf4c
test: BungeeClient
ankurdubey521 Jun 13, 2024
567b2c5
feat: Token Price Provider and Utils
ankurdubey521 Jun 13, 2024
7c2da06
fix: Replace BigInt by Ruint
ankurdubey521 Jun 13, 2024
9e848b3
test: Fix Bungee Tests
ankurdubey521 Jun 13, 2024
c760b25
feat: Build Estimators in Routes Indexer
ankurdubey521 Jun 13, 2024
6d59779
fix: Add BUNGEE_API_KEY in github action
ankurdubey521 Jun 13, 2024
580a7bd
feat: refactor engine
AmanRaj1608 Jun 13, 2024
82946d4
fix: add display in all structs
AmanRaj1608 Jun 13, 2024
368733d
feat: added chainning instead of options
AmanRaj1608 Jun 14, 2024
ee0db66
Update test.yml
AmanRaj1608 Jun 14, 2024
3b3fb3c
Update test.yml
AmanRaj1608 Jun 14, 2024
9e6aab3
Merge branch 'feat/bin-refactor' of https://github.com/bcnmy/reflux i…
AmanRaj1608 Jun 14, 2024
af311fa
feat: Estimator Flow
ankurdubey521 Jun 14, 2024
8e13c1f
feat: Redis Model Store
ankurdubey521 Jun 14, 2024
9810f32
chore: refactor of storage and AAS
AmanRaj1608 Jun 14, 2024
2a5599d
Merge remote-tracking branch 'origin/feat/routing-indexer' into feat/…
AmanRaj1608 Jun 14, 2024
bb3353a
fix: conflict resolve
AmanRaj1608 Jun 19, 2024
48742d6
temp: fix build
ankurdubey521 Jun 19, 2024
72e89d3
chore: remove code
AmanRaj1608 Jun 19, 2024
eaee4dc
Merge remote-tracking branch 'origin/feat/routing-indexer' into feat/…
AmanRaj1608 Jun 19, 2024
fd7ae8f
edit readme
AmanRaj1608 Jun 19, 2024
31e7cbd
feat: Redis Publish Subscribe
ankurdubey521 Jun 19, 2024
56b96c8
Merge remote-tracking branch 'origin/feat/merge-prs' into feat/routin…
ankurdubey521 Jun 19, 2024
d9c6d0d
fix: Migrate to thiserror
ankurdubey521 Jun 19, 2024
db8502a
feat: Token Price Provider Integration
ankurdubey521 Jun 20, 2024
c933324
Merge branch 'dev' into feat/routing-indexer
ankurdubey521 Jun 20, 2024
7a15a75
feat: decode cached data for engine
AmanRaj1608 Jun 20, 2024
5835416
add tests
AmanRaj1608 Jun 20, 2024
29ce8fd
add best path test
AmanRaj1608 Jun 20, 2024
83588b4
fix: Indexer
ankurdubey521 Jun 24, 2024
1e96d84
fix: Tests
ankurdubey521 Jun 24, 2024
9c32038
temp: broken indexer
ankurdubey521 Jun 24, 2024
f8d1f6f
Merge remote-tracking branch 'origin/feat/routing-indexer' into feat/…
AmanRaj1608 Jun 24, 2024
c064ee0
feat: pub-sub cache update logic (#7)
AmanRaj1608 Jun 25, 2024
d29a71d
Merge remote-tracking branch 'origin/dev' into feat/cache-decode
AmanRaj1608 Jun 25, 2024
29ee0c4
feat: Add COVALENT_API_KEY to test environment
AmanRaj1608 Jun 25, 2024
d25ed8a
fix comments
AmanRaj1608 Jun 25, 2024
d937d7c
refactor: storage (#8)
ankurdubey521 Jun 25, 2024
69da0df
feat: add error handling
AmanRaj1608 Jun 27, 2024
e769433
fix: take the solver x and y from config parameter
AmanRaj1608 Jun 27, 2024
2939d04
fix: tests
AmanRaj1608 Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/reflux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ tokio = { version = "1.38.0", features = ["full"] }
tower-http = { version = "0.5.2", features = ["cors"] }
axum = "0.7.5"
log = "0.4.21"
tokio-cron-scheduler = "0.10.2"
simple_logger = "5.0.0"

# workspace dependencies
account-aggregation = { workspace = true }
Expand Down
123 changes: 109 additions & 14 deletions bin/reflux/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,77 @@
use std::sync::Arc;
use std::time::Duration;

use axum::http::Method;
use log::{error, info};
use tokio;
use tokio::signal;
use tokio_cron_scheduler::{Job, JobScheduler};
use tower_http::cors::{Any, CorsLayer};

use account_aggregation::service::AccountAggregationService;
use api::service_controller::ServiceController;
use axum::http::Method;
use config::Config;
use log::info;
use routing_engine::{BungeeClient, CoingeckoClient, Indexer};
use routing_engine::engine::RoutingEngine;
use routing_engine::estimator::LinearRegressionEstimator;
use storage::mongodb_provider::MongoDBProvider;
use tokio;
use tokio::signal;
use tower_http::cors::{Any, CorsLayer};
use storage::RedisClient;

#[tokio::main]
async fn main() {
simple_logger::SimpleLogger::new().env().init().unwrap();

// Load configuration from yaml
let config = Config::from_file("config.yaml").expect("Failed to load config file");
let mongodb_uri = config.infra.mongo_url;
let (app_host, app_port) = (config.server.host, config.server.port);

if config.indexer_config.is_indexer {
run_indexer(config).await;
} else {
run_server(config).await;
}
}

async fn run_server(config: Config) {
info!("Starting Reflux Server");

let (app_host, app_port) = (config.server.host.clone(), config.server.port.clone());

// Instance of MongoDBProvider for users and account mappings
let user_db_provider =
MongoDBProvider::new(&mongodb_uri, "reflux".to_string(), "users".to_string(), true)
.await
.expect("Failed to create MongoDB provider for users");
let user_db_provider = MongoDBProvider::new(
&config.infra.mongo_url,
"reflux".to_string(),
"users".to_string(),
true,
)
.await
.expect("Failed to create MongoDB provider for users");
let account_mapping_db_provider = MongoDBProvider::new(
&mongodb_uri,
&config.infra.mongo_url,
"reflux".to_string(),
"account_mappings".to_string(),
false,
)
.await
.expect("Failed to create MongoDB provider for account mappings");

let (covalent_base_url, covalent_api_key) = (config.covalent.base_url, config.covalent.api_key);
let (covalent_base_url, covalent_api_key) =
(config.covalent.base_url.clone(), config.covalent.api_key.clone());

let networks: Vec<String> =
config.chains.iter().map(|(_, chain)| chain.covalent_name.clone()).collect();

// Initialize account aggregation service for api
let account_service = AccountAggregationService::new(
user_db_provider.clone(),
account_mapping_db_provider.clone(),
networks,
covalent_base_url,
covalent_api_key,
);

// Initialize routing engine
let routing_engine = RoutingEngine::new(account_service.clone());
let buckets = config.buckets;
let routing_engine = RoutingEngine::new(account_service.clone(), buckets);

// API service controller
let service_controller = ServiceController::new(account_service, routing_engine);
Expand All @@ -65,6 +95,71 @@ async fn main() {
info!("Server stopped.");
}

async fn run_indexer(config: Config) {
info!("Configuring Indexer");

let schedule = config.indexer_config.schedule.clone();
let scheduler = JobScheduler::new().await.expect("Failed to create scheduler");

let config = Arc::new(config);

let job = Job::new_async(schedule.as_str(), move |uuid, mut l: JobScheduler| {
/*
TODO: I"m not sure why this works tbh
Observations:
1. If I don't use Arc, I get an error stating that the value is moved
2. If this closure (not the async block) is not move, I get an error stating that the value is moved
3. If I borrow config in this closure, I get a dangling reference error
A combination of using Arc and move in the closure seems to work, but I'm not sure why
*/

let config = Arc::clone(&config);

Box::pin(async move {
let redis_provider = RedisClient::build(&config.infra.redis_url)
.await
.expect("Failed to instantiate redis client");

let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key)
.expect("Failed to Instantiate Bungee Client");

let token_price_provider = CoingeckoClient::new(
&config.coingecko.base_url,
&config.coingecko.api_key,
&redis_provider,
Duration::from_secs(config.coingecko.expiry_sec),
);

let indexer: Indexer<
BungeeClient,
RedisClient,
RedisClient,
CoingeckoClient<RedisClient>,
> = Indexer::new(
&config,
&bungee_client,
&redis_provider,
&redis_provider,
&token_price_provider,
);

match indexer.run::<LinearRegressionEstimator>().await {
Ok(_) => info!("Indexer Job Completed"),
Err(e) => error!("Indexer Job Failed: {}", e),
};

let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => println!("Next time for the job is {:?}", ts),
_ => println!("Could not get next tick for the job"),
};
})
})
.expect("Failed to create job");

scheduler.add(job).await.expect("Failed to add job");
}

async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("Unable to handle ctrl+c");
Expand Down
100 changes: 62 additions & 38 deletions config.yaml.example
Original file line number Diff line number Diff line change
@@ -1,58 +1,82 @@
chains:
- id: 1
name: Ethereum
covalent_name: eth-mainnet
is_enabled: true
- id: 56
name: Binance Smart Chain
- id: 42161
name: Arbitrum
is_enabled: true
covalent_name: bsc-mainnet
tokens:
- symbol: ETH
- symbol: USDC
is_enabled: true
coingecko_symbol: usd-coin
by_chain:
1:
is_enabled: true
decimals: 18
address: '0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE'
56:
decimals: 6
address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'
42161:
is_enabled: true
decimals: 18
address: '0x2170Ed0880ac9A755fd29B2688956BD959F933F8'
- symbol: BNB
is_enabled: true
by_chain:
1:
is_enabled: false
decimals: 18
address: '0xB8c77482e45F1F44dE1745F52C74426C631bDD52'
56:
is_enabled: true
decimals: 18
address: '0xEEeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
decimals: 6
address: '0xaf88d065e77c8cC2239327C5EDb3A432268e5831'
buckets:
- from_chain_id: 1
to_chain_id: 56
from_token: ETH
to_token: BNB
is_smart_contract_deposit_supported: true
token_amount_from_usd: 0.1
to_chain_id: 42161
from_token: USDC
to_token: USDC
is_smart_contract_deposit_supported: false
token_amount_from_usd: 1
token_amount_to_usd: 10
- from_chain_id: 1
to_chain_id: 42161
from_token: USDC
to_token: USDC
is_smart_contract_deposit_supported: false
token_amount_from_usd: 10
token_amount_to_usd: 100
- from_chain_id: 1
to_chain_id: 42161
from_token: USDC
to_token: USDC
is_smart_contract_deposit_supported: false
token_amount_from_usd: 100
token_amount_to_usd: 1000
- from_chain_id: 1
to_chain_id: 42161
from_token: USDC
to_token: USDC
is_smart_contract_deposit_supported: false
token_amount_from_usd: 1000
token_amount_to_usd: 10000
- from_chain_id: 1
to_chain_id: 42161
from_token: USDC
to_token: USDC
is_smart_contract_deposit_supported: false
token_amount_from_usd: 10000
token_amount_to_usd: 100000
bungee:
base_url: 'https://api.bungee.exchange'
api_key: 'my-api'
base_url: https://api.socket.tech/v2
api_key:
covalent:
base_url: 'https://api.bungee.exchange'
api_key: 'my-api'
base_url: ''
api_key: 'my-api'
coingecko:
base_url: 'https://api.coingecko.com'
api_key: 'my-api'
base_url: https://api.coingecko.com/api/v3
api_key:
expiry_sec: 300
infra:
redis_url: 'redis://localhost:6379'
rabbitmq_url: 'amqp://localhost:5672'
mongo_url: 'mongodb://localhost:27017'
redis_url: redis://localhost:6379
rabbitmq_url: amqp://localhost:5672
mongo_url: mongodb://127.0.0.1:27017
server:
port: 8080
host: 'localhost'
port: 8080
host: localhost
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
schedule: "* */6 * * *"
points_per_bucket: 3

1 change: 1 addition & 0 deletions crates/account-aggregation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[dependencies]
mongodb = "2.8.2"
async-trait = "0.1.80"
reqwest = { version = "0.12.4", features = ["json"] }
tokio = "1.38.0"
serde = { version = "1.0.203", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/account-aggregation/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod service;
pub mod types;
pub mod service_trait;
10 changes: 9 additions & 1 deletion crates/account-aggregation/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use storage::db_provider::DBProvider;
use storage::mongodb_provider::MongoDBProvider;
use uuid::Uuid;

/// Account Aggregation Service
///
/// This service is responsible for managing user accounts and their balances
/// It interacts with the user and account mapping databases to store and retrieve user account information

#[derive(Clone, Display, Debug)]
#[display(
"AccountAggregationService {{ user_db_provider: {:?}, account_mapping_db_provider: {:?} }}",
Expand All @@ -23,13 +28,15 @@ pub struct AccountAggregationService {
covalent_base_url: String,
covalent_api_key: String,
client: ReqwestClient,
networks: Vec<String>,
}

impl AccountAggregationService {
/// Create a new AccountAggregationService
pub fn new(
user_db_provider: MongoDBProvider,
account_mapping_db_provider: MongoDBProvider,
networks: Vec<String>,
base_url: String,
api_key: String,
) -> Self {
Expand All @@ -41,6 +48,7 @@ impl AccountAggregationService {
covalent_base_url: base_url,
covalent_api_key: api_key,
client: reqwest_client,
networks,
}
}

Expand Down Expand Up @@ -182,7 +190,7 @@ impl AccountAggregationService {
}

let mut balances = Vec::new();
let networks = ["matic-mainnet", "base-mainnet", "arbitrum-mainnet", "optimism-mainnet"];
let networks = self.networks.clone();

// todo: parallelize this
for user in accounts.iter() {
Expand Down
22 changes: 22 additions & 0 deletions crates/account-aggregation/src/service_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use async_trait::async_trait;
use storage::mongodb_provider::MongoDBProvider;
use crate::types::{Account, AddAccountPayload, Balance, RegisterAccountPayload};
use std::error::Error;

#[async_trait]
pub trait AccountAggregationServiceTrait {
fn new(
user_db_provider: MongoDBProvider,
account_mapping_db_provider: MongoDBProvider,
base_url: String,
api_key: String,
) -> Self;
async fn get_user_id(&self, account: &String) -> Option<String>;
fn get_user_accounts(&self, user_id: &String) -> Option<Vec<Account>>;
fn register_user_account(
&self,
account_payload: RegisterAccountPayload,
) -> Result<(), Box<dyn Error>>;
fn add_account(&self, account_payload: AddAccountPayload) -> Result<(), Box<dyn Error>>;
fn get_user_accounts_balance(&self, account: &String) -> Vec<Balance>;
}
Loading
Loading