Skip to content

Commit

Permalink
Merge pull request #43 from bcnmy/dev
Browse files Browse the repository at this point in the history
Fix/redis batch import (#42)
  • Loading branch information
ankurdubey521 authored Jul 17, 2024
2 parents 45af903 + 2549bcd commit d4da961
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
5 changes: 4 additions & 1 deletion crates/routing-engine/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ mod tests {
todo!()
}

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError> {
async fn get_all_key_values(
&self,
_: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError> {
todo!()
}
}
Expand Down
9 changes: 7 additions & 2 deletions crates/routing-engine/src/routing_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::{
use crate::token_price::TokenPriceProvider;
use crate::token_price::utils::{Errors, get_token_price};

const FETCH_REDIS_KEYS_BATCH_SIZE: usize = 50;

/// (from_chain, to_chain, from_token, to_token)
#[derive(Debug)]
struct PathQuery(u32, u32, String, String);
Expand Down Expand Up @@ -87,7 +89,7 @@ impl<PriceProvider: TokenPriceProvider> RoutingEngine<PriceProvider> {

/// Refresh the cache from Redis
pub async fn refresh_cache(&self) {
match self.redis_client.get_all_key_values().await {
match self.redis_client.get_all_key_values(Some(FETCH_REDIS_KEYS_BATCH_SIZE)).await {
Ok(kv_pairs) => {
info!("Refreshing cache from Redis.");
let mut cache = self.cache.write().await;
Expand Down Expand Up @@ -418,7 +420,10 @@ mod tests {
unimplemented!()
}

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError> {
async fn get_all_key_values(
&self,
_: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError> {
unimplemented!()
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/routing-engine/src/settlement_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ mod tests {
unimplemented!()
}

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError> {
async fn get_all_key_values(
&self,
_: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError> {
unimplemented!()
}
}
Expand Down
7 changes: 5 additions & 2 deletions crates/routing-engine/src/token_price/coingecko.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ mod tests {

use async_trait::async_trait;
use derive_more::Display;
use serial_test::serial;
use thiserror::Error;

use config::{Config, get_sample_config};
use serial_test::serial;
use storage::{KeyValueStore, RedisClientError};

use crate::CoingeckoClient;
Expand Down Expand Up @@ -192,7 +192,10 @@ mod tests {
unimplemented!()
}

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError> {
async fn get_all_key_values(
&self,
_: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError> {
unimplemented!()
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ pub trait KeyValueStore: Debug + Send + Sync {

async fn get_all_keys(&self) -> Result<Vec<String>, RedisClientError>;

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError>;
async fn get_all_key_values(
&self,
batch_size: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError>;
}

#[async_trait]
Expand Down
15 changes: 12 additions & 3 deletions crates/storage/src/redis_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ impl KeyValueStore for RedisClient {
Ok(keys)
}

async fn get_all_key_values(&self) -> Result<HashMap<String, String>, RedisClientError> {
async fn get_all_key_values(
&self,
batch_size: Option<usize>,
) -> Result<HashMap<String, String>, RedisClientError> {
info!("Fetching all key-value pairs");
let keys = self.get_all_keys().await?;
let values: Vec<String> = self.connection.clone().mget(&keys).await?;

let batch_size = batch_size.unwrap_or(keys.len());
let mut values = Vec::new();
for batch in keys.chunks(batch_size) {
values.extend(self.connection.clone().mget::<'_, _, Vec<_>>(batch).await?.into_iter());
}

let kv_pairs = keys.into_iter().zip(values.into_iter()).collect();
Ok(kv_pairs)
}
Expand Down Expand Up @@ -167,7 +176,7 @@ mod tests {
.unwrap();

// Fetch all key-values
let key_values = client.get_all_key_values().await.unwrap();
let key_values = client.get_all_key_values(None).await.unwrap();

assert_eq!(key_values.get("key1").unwrap(), "value1");
assert_eq!(key_values.get("key2").unwrap(), "value2");
Expand Down

0 comments on commit d4da961

Please sign in to comment.