Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into feat/cache-decode
Browse files Browse the repository at this point in the history
  • Loading branch information
AmanRaj1608 committed Jun 25, 2024
2 parents c064ee0 + d6c93a6 commit d29a71d
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 155 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@

# include example files
!/examples

*.yaml
*.env
18 changes: 10 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
name: Test Suite
on: [pull_request, push]
on: [ pull_request ]

jobs:
test:
name: cargo test
runs-on: ubuntu-latest

services:
mongodb:
image: mongo:6
image: mongo:latest
ports:
- 27017:27017
options: >-
--health-cmd="mongosh --eval 'db.adminCommand({ ping: 1 })'"
--health-interval=30s
--health-timeout=10s
--health-retries=10
redis:
image: redis:latest
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4

- name: Set up Rust
uses: dtolnay/rust-toolchain@stable

- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: cargo test --all-features
env:
BUNGEE_API_KEY: ${{ secrets.BUNGEE_API_KEY }}
COINGECKO_API_KEY: ${{ secrets.COINGECKO_API_KEY }}
environment: Testing
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ Cargo.lock

*.env
*.swp
config.yaml
*.yaml
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM rust:latest as builder
WORKDIR /reflux
COPY . .
RUN cargo install --path bin/reflux --profile release

FROM debian:latest
RUN apt-get update
RUN apt-get upgrade -y
RUN apt-get install -y libssl-dev ca-certificates
COPY --from=builder /usr/local/cargo/bin/reflux /app/reflux

2 changes: 1 addition & 1 deletion bin/reflux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ tokio = { version = "1.38.0", features = ["full"] }
tower-http = { version = "0.5.2", features = ["cors"] }
axum = "0.7.5"
log = "0.4.21"
tokio-cron-scheduler = "0.10.2"
simple_logger = "5.0.0"
clap = { version = "4.5.7", features = ["derive"] }

# workspace dependencies
account-aggregation = { workspace = true }
Expand Down
133 changes: 64 additions & 69 deletions bin/reflux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,63 @@ use std::sync::Arc;
use std::time::Duration;

use axum::http::Method;
use log::info;
use clap::Parser;
use log::{debug, error, info};
use tokio;
use tokio::signal;
use tokio_cron_scheduler::{Job, JobScheduler};
use tower_http::cors::{Any, CorsLayer};

use account_aggregation::service::AccountAggregationService;
use api::service_controller::ServiceController;
use config::Config;
use routing_engine::engine::RoutingEngine;
use routing_engine::estimator::LinearRegressionEstimator;
use routing_engine::{BungeeClient, CoingeckoClient, Indexer};
use storage::mongodb_provider::MongoDBProvider;
use storage::{ControlFlow, MessageQueue, RedisClient};

#[derive(Parser, Debug)]
struct Args {
/// Run the Solver (default)
#[arg(short, long)]
solver: bool,

/// Run the Indexer
#[arg(short, long)]
indexer: bool,

/// Config file path
#[arg(short, long, default_value = "config.yaml")]
config: String,
}

#[tokio::main]
async fn main() {
simple_logger::SimpleLogger::new().env().init().unwrap();

let mut args = Args::parse();
debug!("Args: {:?}", args);

if args.indexer && args.solver {
panic!("Cannot run both indexer and solver at the same time");
}

if !args.indexer && !args.solver {
args.solver = true;
debug!("Running Solver by default");
}

// Load configuration from yaml
let config = Config::from_file("config.yaml").expect("Failed to load config file");
let config = Config::from_file(&args.config).expect("Failed to load config file");

if config.indexer_config.is_indexer {
if args.indexer {
run_indexer(config).await;
} else {
run_server(config).await;
} else if args.solver {
run_solver(config).await;
}

info!("Exiting Reflux");
}

async fn run_server(config: Config) {
async fn run_solver(config: Config) {
info!("Starting Reflux Server");

let (app_host, app_port) = (config.server.host.clone(), config.server.port.clone());
Expand Down Expand Up @@ -120,66 +146,35 @@ async fn run_server(config: Config) {
async fn run_indexer(config: Config) {
info!("Configuring Indexer");

let schedule = config.indexer_config.schedule.clone();
let scheduler = JobScheduler::new().await.expect("Failed to create scheduler");

let config = Arc::new(config);

let job = Job::new_async(schedule.as_str(), move |uuid, mut l: JobScheduler| {
/*
TODO: I"m not sure why this works tbh
Observations:
1. If I don't use Arc, I get an error stating that the value is moved
2. If this closure (not the async block) is not move, I get an error stating that the value is moved
3. If I borrow config in this closure, I get a dangling reference error
A combination of using Arc and move in the closure seems to work, but I'm not sure why
*/

let config = Arc::clone(&config);

Box::pin(async move {
let redis_provider = RedisClient::build(&config.infra.redis_url)
.await
.expect("Failed to instantiate redis client");

let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key)
.expect("Failed to Instantiate Bungee Client");

let token_price_provider = CoingeckoClient::new(
&config.coingecko.base_url,
&config.coingecko.api_key,
&redis_provider,
Duration::from_secs(config.coingecko.expiry_sec),
);

let indexer: Indexer<
BungeeClient,
RedisClient,
RedisClient,
CoingeckoClient<RedisClient>,
> = Indexer::new(
&config,
&bungee_client,
&redis_provider,
&redis_provider,
&token_price_provider,
);

// match indexer.run::<LinearRegressionEstimator>().await {
// Ok(_) => info!("Indexer Job Completed"),
// Err(e) => error!("Indexer Job Failed: {}", e),
// };

let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => println!("Next time for the job is {:?}", ts),
_ => println!("Could not get next tick for the job"),
};
})
})
.expect("Failed to create job");

scheduler.add(job).await.expect("Failed to add job");
let config = config;

let redis_provider = RedisClient::build(&config.infra.redis_url)
.await
.expect("Failed to instantiate redis client");

let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key)
.expect("Failed to Instantiate Bungee Client");

let token_price_provider = CoingeckoClient::new(
&config.coingecko.base_url,
&config.coingecko.api_key,
&redis_provider,
Duration::from_secs(config.coingecko.expiry_sec),
);

let indexer: Indexer<BungeeClient, RedisClient, RedisClient, CoingeckoClient<RedisClient>> =
Indexer::new(
&config,
&bungee_client,
&redis_provider,
&redis_provider,
&token_price_provider,
);

match indexer.run::<LinearRegressionEstimator>().await {
Ok(_) => info!("Indexer Job Completed"),
Err(e) => error!("Indexer Job Failed: {}", e),
};
}

async fn shutdown_signal() {
Expand Down
4 changes: 0 additions & 4 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,11 @@ coingecko:
expiry_sec: 300
infra:
redis_url: redis://localhost:6379
rabbitmq_url: amqp://localhost:5672
mongo_url: mongodb://127.0.0.1:27017
server:
port: 8080
host: localhost
indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
schedule: "* */6 * * *"
points_per_bucket: 3

32 changes: 11 additions & 21 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ impl Ord for BucketConfig {
}
}

impl BucketConfig {
pub fn get_hash(&self) -> u64 {
let mut s = DefaultHasher::new();
self.hash(&mut s);
s.finish()
}
}

// Implementation for treating a BucketConfig as a key in a k-v pair
impl Hash for BucketConfig {
fn hash<H: Hasher>(&self, state: &mut H) {
Expand All @@ -280,13 +288,7 @@ impl Hash for BucketConfig {

impl PartialEq<Self> for BucketConfig {
fn eq(&self, other: &Self) -> bool {
let mut s1 = DefaultHasher::new();
let mut s2 = DefaultHasher::new();

self.hash(&mut s1);
other.hash(&mut s2);

s1.finish() == s2.finish()
self.get_hash() == other.get_hash()
}
}

Expand Down Expand Up @@ -399,9 +401,6 @@ pub struct InfraConfig {
// The URL of the Redis
#[validate(pattern = r"redis://[-a-zA-Z0-9@:%._\+~#=]{1,256}")]
pub redis_url: String,
// The URL of the RabbitMQ
#[validate(pattern = r"amqp://[-a-zA-Z0-9@:%._\+~#=]{1,256}")]
pub rabbitmq_url: String,
// The URL of the MongoDB
#[validate(pattern = r"mongodb://[-a-zA-Z0-9@:%._\+~#=]{1,256}")]
pub mongo_url: String,
Expand All @@ -420,17 +419,12 @@ pub struct ServerConfig {

#[derive(Debug, Deserialize, Validate)]
pub struct IndexerConfig {
pub is_indexer: bool,

#[validate(min_length = 1)]
pub indexer_update_topic: String,

#[validate(min_length = 1)]
pub indexer_update_message: String,

#[validate(min_length = 1)]
pub schedule: String,

#[validate(minimum = 2)]
pub points_per_bucket: u64,
}
Expand All @@ -440,13 +434,13 @@ pub fn get_sample_config() -> Config {
}

#[cfg(test)]
pub mod tests {
mod tests {
use crate::config::{Config, ConfigError};
use crate::get_sample_config;

#[test]
fn test_config_parsing() {
let config = get_sample_config();
get_sample_config();
}

#[test]
Expand Down Expand Up @@ -475,7 +469,6 @@ coingecko:
expiry_sec: 5
infra:
redis_url: 'redis://localhost:6379'
rabbitmq_url: 'amqp://localhost:5672'
mongo_url: 'mongodb://localhost:27017'
server:
port: 8080
Expand All @@ -484,7 +477,6 @@ indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
schedule: "*"
points_per_bucket: 10
"#;
assert_eq!(
Expand Down Expand Up @@ -543,7 +535,6 @@ coingecko:
expiry_sec: 5
infra:
redis_url: 'redis://localhost:6379'
rabbitmq_url: 'amqp://localhost:5672'
mongo_url: 'mongodb://localhost:27017'
server:
port: 8080
Expand All @@ -552,7 +543,6 @@ indexer_config:
is_indexer: true
indexer_update_topic: indexer_update
indexer_update_message: message
schedule: "*"
points_per_bucket: 10
"#;

Expand Down
1 change: 1 addition & 0 deletions crates/routing-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::stream::{self, StreamExt};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;

use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use derive_more::derive;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
Loading

0 comments on commit d29a71d

Please sign in to comment.