Skip to content

Commit

Permalink
refactor caching logic; cache ttl 0 now means instantly expired
Browse files Browse the repository at this point in the history
  • Loading branch information
dshiell committed Aug 10, 2024
1 parent da8873f commit b98f281
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 52 deletions.
8 changes: 6 additions & 2 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ pub struct Args {
#[arg(short, long, default_value = "100000")]
pub lru_max_items: usize,

#[arg(long, default_value = "12")]
#[arg(
long,
default_value = "12",
help = "Global TTL to account for reorgs. Setting to zero disables caching."
)]
pub reorg_ttl: u32,

#[arg(short, long = "cache", default_value = "lru", value_parser = cache_backend_parser)]
Expand All @@ -26,7 +30,7 @@ pub struct Args {
#[arg(
short,
long,
help = "Redis URL. If not suppiled, in memory cache backend will be used."
help = "Redis URL. If not suppiled, in memory cache backend will be used (example: redis://localhost:6379)."
)]
pub redis_url: Option<String>,
}
Expand Down
4 changes: 4 additions & 0 deletions src/cache/lru_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub struct LruBackend {
}

impl CacheBackend for LruBackend {
fn get_reorg_ttl(&self) -> u32 {
self.reorg_ttl
}

fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result<CacheStatus> {
let key = format!("{method}:{params_key}");

Expand Down
4 changes: 4 additions & 0 deletions src/cache/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct MemoryBackend {
}

impl CacheBackend for MemoryBackend {
fn get_reorg_ttl(&self) -> u32 {
self.reorg_ttl
}

fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result<CacheStatus> {
let key = format!("{method}:{params_key}");

Expand Down
29 changes: 9 additions & 20 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Local;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::cmp::min;
use tracing::info;

pub enum CacheStatus {
Cached { key: String, value: CacheValue },
Expand All @@ -23,6 +24,7 @@ pub struct CacheValue {
impl CacheValue {
pub fn new(data: Value, reorg_ttl: u32, ttl: u32) -> Self {
let last_modified = Local::now().timestamp();
let reorg_ttl = std::cmp::max(reorg_ttl, 1); // make sure nonzero
Self {
data,
reorg_ttl,
Expand All @@ -33,27 +35,14 @@ impl CacheValue {

pub fn is_expired(&self) -> bool {
let now = Local::now().timestamp();

let last_modified = self.last_modified;
let reorg_ttl = self.reorg_ttl;
let ttl = self.ttl;

if last_modified > now {
return true;
}

let age: u64 = (now - last_modified) as u64;
let ttl = if reorg_ttl == 0 && ttl == 0 {
0
} else if reorg_ttl == 0 && ttl > 0 {
ttl
} else if reorg_ttl > 0 && ttl == 0 {
reorg_ttl
} else {
min(reorg_ttl, ttl)
};
let ttl = self.effective_ttl();
age > ttl.into()
}

ttl != 0 && age > ttl.into()
pub fn effective_ttl(&self) -> u32 {
min(self.reorg_ttl, self.ttl)
}

pub fn update(mut self, expired_value: &Option<Self>, reorg_ttl: u32) -> Self {
Expand All @@ -70,8 +59,7 @@ impl CacheValue {
self.reorg_ttl = if is_new {
reorg_ttl
} else {
let now = Local::now().timestamp();
let age: u64 = (now - expired_value.last_modified) as u64;
let age: u64 = (self.last_modified - expired_value.last_modified) as u64;
if age > expired_value.reorg_ttl as u64 {
expired_value.reorg_ttl * 2
} else {
Expand Down Expand Up @@ -99,6 +87,7 @@ pub trait CacheBackendFactory: Send + Sync {
}

pub trait CacheBackend {
fn get_reorg_ttl(&self) -> u32;
fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result<CacheStatus>;
fn write(
&mut self,
Expand Down
9 changes: 8 additions & 1 deletion src/cache/redis_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub struct RedisBackend {
}

impl CacheBackend for RedisBackend {
fn get_reorg_ttl(&self) -> u32 {
self.reorg_ttl
}

fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result<CacheStatus> {
let cache_key = format!("{}:{method}:{params_key}", self.chain_id);
let value: Option<String> = self.conn.get(&cache_key)?;
Expand All @@ -63,7 +67,10 @@ impl CacheBackend for RedisBackend {
expired_value: &Option<CacheValue>,
) -> anyhow::Result<()> {
let cache_value = cache_value.update(expired_value, self.reorg_ttl);
let _ = self.conn.set::<_, _, String>(key, cache_value.to_string()?);
let redis_ttl = cache_value.effective_ttl() * 2;
let _ = self
.conn
.set_ex::<_, _, String>(key, cache_value.to_string()?, redis_ttl.into());
Ok(())
}
}
39 changes: 22 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,24 @@ async fn rpc_call(
// made the early return.
let handler = chain_state.handlers.get(&rpc_request.method).unwrap();

let (is_cacheable, extracted_value) = match handler.extract_cache_value(result) {
Ok(v) => v,
Err(err) => {
metrics.error_counter.inc();
tracing::error!("fail to extract cache value because: {}", err);
let (is_cacheable, extracted_value) =
match handler.extract_cache_value(result, cache_backend.get_reorg_ttl()) {
Ok(v) => v,
Err(err) => {
metrics.error_counter.inc();
tracing::error!("fail to extract cache value because: {}", err);

ordered_requests_result[rpc_request.index] = Some(JsonRpcResponse::from_error(
Some(rpc_request.id.clone()),
DefinedError::InternalError(Some(json!({
"error": "fail to extract cache value",
"reason": err.to_string(),
}))),
));
ordered_requests_result[rpc_request.index] = Some(JsonRpcResponse::from_error(
Some(rpc_request.id.clone()),
DefinedError::InternalError(Some(json!({
"error": "fail to extract cache value",
"reason": err.to_string(),
}))),
));

continue;
}
};
continue;
}
};

if is_cacheable {
let _ = cache_backend.write(cache_key.as_str(), extracted_value, cache_value);
Expand Down Expand Up @@ -482,8 +483,12 @@ impl HandlerEntry {
self.inner.extract_cache_key(params)
}

fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> {
self.inner.extract_cache_value(result)
fn extract_cache_value(
&self,
result: Value,
reorg_ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
self.inner.extract_cache_value(result, reorg_ttl)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/rpc_cache_handler/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ pub fn extract_address_cache_key(params: &Value) -> anyhow::Result<Option<String

pub fn extract_transaction_cache_value(
result: Value,
reorg_ttl: u32,
ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
let is_cacheable = result.is_object() && !result["blockHash"].is_null();
Ok((is_cacheable, CacheValue::new(result, 0, ttl)))
Ok((is_cacheable, CacheValue::new(result, reorg_ttl, ttl)))
}

pub fn extract_and_format_block_number(value: &Value) -> anyhow::Result<Option<String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ impl RpcCacheHandler for Handler {
Ok(Some(format!("{block_hash}-{tx_index}")))
}

fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, self.get_ttl())
fn extract_cache_value(
&self,
result: Value,
reorg_ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ impl RpcCacheHandler for Handler {
Ok(Some(format!("{block_number}-{tx_index}")))
}

fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, self.get_ttl())
fn extract_cache_value(
&self,
result: Value,
reorg_ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl())
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/rpc_cache_handler/eth_get_transaction_by_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ impl RpcCacheHandler for Handler {
self.inner.extract_cache_key(params)
}

fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, self.get_ttl())
fn extract_cache_value(
&self,
result: Value,
reorg_ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl())
}
}
8 changes: 6 additions & 2 deletions src/rpc_cache_handler/eth_get_transaction_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ impl RpcCacheHandler for Handler {
Ok(Some(format!("{tx_hash:#x}")))
}

fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, self.get_ttl())
fn extract_cache_value(
&self,
result: Value,
reorg_ttl: u32,
) -> anyhow::Result<(bool, CacheValue)> {
common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl())
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/rpc_cache_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ pub trait RpcCacheHandler: Send + Sync {

fn extract_cache_key(&self, params: &Value) -> Result<Option<String>>;

fn extract_cache_value(&self, result: Value) -> Result<(bool, CacheValue)> {
fn extract_cache_value(&self, result: Value, reorg_ttl: u32) -> Result<(bool, CacheValue)> {
// reorg_ttl is managed by cache backend
Ok((
!result.is_null(),
CacheValue::new(result, 0, self.get_ttl()),
CacheValue::new(result, reorg_ttl, self.get_ttl()),
))
}

// default ttl is 1 day
fn get_ttl(&self) -> u32 {
0
86400
}
}

Expand Down

0 comments on commit b98f281

Please sign in to comment.