Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/routing-indexer' into feat/…
Browse files Browse the repository at this point in the history
…merge-prs
  • Loading branch information
AmanRaj1608 committed Jun 19, 2024
2 parents 72e89d3 + 48742d6 commit eaee4dc
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
17 changes: 10 additions & 7 deletions crates/routing-engine/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<

Ok::<
estimator::DataPoint<f64, f64>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
>(estimator::DataPoint {
x: input_value_in_usd,
y: fee_in_usd,
Expand All @@ -101,7 +101,7 @@ impl<
.collect::<Vec<
Result<
estimator::DataPoint<f64, f64>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
>,
>>()
.await
Expand All @@ -124,7 +124,7 @@ impl<
>(
&mut self,
values: Vec<(&&BucketConfig, &Estimator)>,
) -> Result<(), IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>> {
) -> Result<(), IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>> {
let values_transformed = values
.iter()
.map(|(k, v)| {
Expand All @@ -149,7 +149,7 @@ impl<
&mut self,
) -> Result<
HashMap<&'config BucketConfig, Estimator>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
> {
// Build Estimators
let estimator_map: HashMap<&BucketConfig, Estimator> =
Expand Down Expand Up @@ -188,6 +188,7 @@ enum IndexerErrors<
T: token_price::TokenPriceProvider,
S: source::RouteSource,
R: storage::RoutingModelStore,
U: storage::MessageQueue,
> {
#[display("Route build error: {}", _0)]
RouteBuildError(RouteError),
Expand All @@ -205,7 +206,7 @@ enum IndexerErrors<
PublishEstimatorErrors(Vec<R::Error>),

#[display("Indexer update message error: {}", _0)]
PublishIndexerUpdateMessageError(String),
PublishIndexerUpdateMessageError(U::Error),
}

#[cfg(test)]
Expand Down Expand Up @@ -245,11 +246,13 @@ mod tests {

struct ProducerStub;
impl MessageQueue for ProducerStub {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String> {
type Error = ();

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), ()> {
Ok(())
}

async fn subscribe(&mut self, topic: &str) -> Result<String, String> {
async fn subscribe(&mut self, topic: &str) -> Result<String, ()> {
Ok("Subscribed".to_string())
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/routing-engine/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::fmt::Debug;
use ruint::aliases::U256;

pub use bungee::BungeeClient;
use config;

use crate::{CostType, Route};

Expand Down
6 changes: 4 additions & 2 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ pub trait RoutingModelStore {
}

pub trait MessageQueue {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String>;
type Error: Debug;

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error>;

async fn subscribe(&mut self, topic: &str) -> Result<String, String>;
async fn subscribe(&mut self, topic: &str) -> Result<String, Self::Error>;
}
10 changes: 6 additions & 4 deletions crates/storage/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use redis;
use redis::{aio, AsyncCommands};
use redis::{aio, AsyncCommands, Commands};
use redis::RedisError;
use thiserror::Error;

Expand Down Expand Up @@ -41,11 +41,13 @@ impl RoutingModelStore for RedisClient {
}

impl MessageQueue for RedisClient {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String> {
todo!()
type Error = RedisClientError;

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error> {
self.connection.publish(topic, message).await.map_err(RedisClientError::RedisLibraryError)
}

async fn subscribe(&mut self, topic: &str) -> Result<String, String> {
async fn subscribe(&mut self, topic: &str) -> Result<String, Self::Error> {
todo!()
}
}
Expand Down

0 comments on commit eaee4dc

Please sign in to comment.