Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Settlement Engine #9

Merged
merged 24 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0e078e0
feat: socket::generate_route_transactions
ankurdubey521 Jul 1, 2024
3f7d0a4
feat: update engine spec and api validation
AmanRaj1608 Jul 1, 2024
3e5f5e0
fix: merge the route struct
AmanRaj1608 Jul 1, 2024
2b13055
Merge pull request #10 from bcnmy/fix/engine-spec
ankurdubey521 Jul 2, 2024
71afe92
merge upstream changes
ankurdubey521 Jul 2, 2024
3ee042e
refactor: engine.rs
ankurdubey521 Jul 2, 2024
5202174
refactor: BridgeResult
ankurdubey521 Jul 2, 2024
806ba18
feat: add config api
AmanRaj1608 Jul 4, 2024
554c196
Merge branch 'feat/settlement-engine' of https://github.com/bcnmy/ref…
AmanRaj1608 Jul 4, 2024
be1a4eb
feat: api routes (#16)
AmanRaj1608 Jul 4, 2024
fe12858
feat: add user address to extract_balance_data function
AmanRaj1608 Jul 4, 2024
c39e042
Merge remote-tracking branch 'origin/feat/settlement-engine' into fix…
AmanRaj1608 Jul 4, 2024
96fee8d
Merge pull request #17 from bcnmy/fix/bridge-result-spec
ankurdubey521 Jul 8, 2024
7eda97b
feat: Settlement Engine
ankurdubey521 Jul 8, 2024
c157720
feat: Read Approval Data from Chain
ankurdubey521 Jul 10, 2024
cd3d25e
feat: RPC URL for chains in config
ankurdubey521 Jul 10, 2024
a189770
test: Approval Logic
ankurdubey521 Jul 10, 2024
21487af
feat: Settlement Engine
ankurdubey521 Jul 11, 2024
48a5e0a
fix: exclude sol! block from doctest
ankurdubey521 Jul 11, 2024
a9bd256
temp: Settlement Engine Service Controller Integration
ankurdubey521 Jul 11, 2024
673a3de
temp-fix: 'parameter may not live long enough'
ankurdubey521 Jul 11, 2024
cf0392c
temp-fix: 'implementation of fnOnce not general enough'
ankurdubey521 Jul 11, 2024
2b70edf
fix: Future is not send
ankurdubey521 Jul 11, 2024
8ebf689
fix: Disable doc tests
ankurdubey521 Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"

members = [
"crates/config",
Expand Down
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ RUN apt-get update
RUN apt-get upgrade -y
RUN apt-get install -y libssl-dev ca-certificates
COPY --from=builder /usr/local/cargo/bin/reflux /app/reflux

5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## reflux

Backend of solver which helps in seamless cross-chain asset consolidation. It aggregates user balances, automates routing, and suggests optimal transactions.
Backend of solver which helps in seamless cross-chain asset consolidation. It aggregates user balances, automates
routing, and suggests optimal transactions.

#### Installation

Expand All @@ -18,4 +19,4 @@ Once build is copleted, just run the server and test with the endpoints

### Dependencies graph

![image](./graph.png)
![image](./assets/dependency-graph.png)
File renamed without changes
90 changes: 41 additions & 49 deletions bin/reflux/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use axum::http::Method;
use clap::Parser;
use log::{debug, error, info};
use tokio::signal;
use tokio::sync::broadcast;
use tower_http::cors::{Any, CorsLayer};

use account_aggregation::service::AccountAggregationService;
use api::service_controller::ServiceController;
use config::Config;
use routing_engine::engine::RoutingEngine;
use routing_engine::estimator::LinearRegressionEstimator;
use routing_engine::{BungeeClient, CoingeckoClient, Indexer};
use storage::mongodb_client::MongoDBClient;
use routing_engine::estimator::LinearRegressionEstimator;
use routing_engine::routing_engine::RoutingEngine;
use storage::{ControlFlow, MessageQueue, RedisClient};
use storage::mongodb_client::MongoDBClient;

#[derive(Parser, Debug)]
struct Args {
Expand Down Expand Up @@ -98,6 +97,8 @@ async fn run_solver(config: Config) {

// Initialize routing engine
let buckets = config.buckets.clone();
let chain_configs = config.chains.clone();
let token_configs = config.tokens.clone();
let redis_client = RedisClient::build(&config.infra.redis_url)
.await
.expect("Failed to instantiate redis client");
Expand All @@ -106,15 +107,15 @@ async fn run_solver(config: Config) {
buckets,
redis_client.clone(),
config.solver_config,
chain_configs,
token_configs,
));

// Subscribe to cache update messages
let cache_update_topic = config.indexer_config.indexer_update_topic.clone();
let routing_engine_clone = Arc::clone(&routing_engine);

let (shutdown_tx, mut shutdown_rx) = broadcast::channel(1);

let cache_update_handle = tokio::spawn(async move {
tokio::task::spawn_blocking(move || {
let redis_client = redis_client.clone();
if let Err(e) = redis_client.subscribe(&cache_update_topic, move |_msg| {
info!("Received cache update notification");
Expand All @@ -126,33 +127,46 @@ async fn run_solver(config: Config) {
}) {
error!("Failed to subscribe to cache update topic: {}", e);
}

// Listen for shutdown signal
let _ = shutdown_rx.recv().await;
});

let token_chain_map: HashMap<String, HashMap<u32, bool>> = config
.tokens
.iter()
.map(|(token, token_config)| {
let chain_supported = token_config
.by_chain
.iter()
.map(|(chain_id, chain_config)| (*chain_id, chain_config.is_enabled))
.collect();
(token.clone(), chain_supported)
})
.collect();

// API service controller
let service_controller = ServiceController::new(account_service, routing_engine);
let chain_supported: Vec<(u32, String)> =
config.chains.iter().map(|(id, chain)| (*id, chain.name.clone())).collect();
let token_supported: Vec<String> =
config.tokens.iter().map(|(_, token_config)| token_config.symbol.clone()).collect();
let service_controller = ServiceController::new(
account_service,
routing_engine,
token_chain_map,
chain_supported,
token_supported,
);

let cors = CorsLayer::new().allow_origin(Any).allow_methods([
Method::GET,
Method::POST,
Method::PATCH,
]);
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET, Method::POST, Method::PATCH])
.allow_headers(Any);

let app = service_controller.router().layer(cors);

let listener = tokio::net::TcpListener::bind(format!("{}:{}", app_host, app_port))
.await
.expect("Failed to bind port");

axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(shutdown_tx.clone()))
.await
.unwrap();

let _ = shutdown_tx.send(());
let _ = cache_update_handle.abort();
axum::serve(listener, app.into_make_service()).await.unwrap();

info!("Server stopped.");
}
Expand All @@ -170,9 +184,9 @@ async fn run_indexer(config: Config) {
.expect("Failed to Instantiate Bungee Client");

let token_price_provider = CoingeckoClient::new(
&config.coingecko.base_url,
&config.coingecko.api_key,
&redis_provider,
config.coingecko.base_url.clone(),
config.coingecko.api_key.clone(),
redis_provider.clone(),
Duration::from_secs(config.coingecko.expiry_sec),
);

Expand All @@ -190,25 +204,3 @@ async fn run_indexer(config: Config) {
Err(e) => error!("Indexer Job Failed: {}", e),
};
}

async fn shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
let ctrl_c = async {
signal::ctrl_c().await.expect("Unable to handle ctrl+c");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

info!("signal received, starting graceful shutdown");
let _ = shutdown_tx.send(());
}
Loading