From b98f281f7939d310843591bed5e4ec66f148538a Mon Sep 17 00:00:00 2001 From: Derek Date: Sat, 10 Aug 2024 16:26:56 -0700 Subject: [PATCH] refactor caching logic; cache ttl 0 now means instantly expired --- src/args.rs | 8 +++- src/cache/lru_backend.rs | 4 ++ src/cache/memory_backend.rs | 4 ++ src/cache/mod.rs | 29 +++++--------- src/cache/redis_backend.rs | 9 ++++- src/main.rs | 39 +++++++++++-------- src/rpc_cache_handler/common.rs | 3 +- ...get_transaction_by_block_hash_and_index.rs | 8 +++- ...t_transaction_by_block_number_and_index.rs | 8 +++- .../eth_get_transaction_by_hash.rs | 8 +++- .../eth_get_transaction_receipt.rs | 8 +++- src/rpc_cache_handler/mod.rs | 7 ++-- 12 files changed, 83 insertions(+), 52 deletions(-) diff --git a/src/args.rs b/src/args.rs index ef48761..1c33fcc 100644 --- a/src/args.rs +++ b/src/args.rs @@ -17,7 +17,11 @@ pub struct Args { #[arg(short, long, default_value = "100000")] pub lru_max_items: usize, - #[arg(long, default_value = "12")] + #[arg( + long, + default_value = "12", + help = "Global TTL to account for reorgs. Setting to zero disables caching." + )] pub reorg_ttl: u32, #[arg(short, long = "cache", default_value = "lru", value_parser = cache_backend_parser)] @@ -26,7 +30,7 @@ pub struct Args { #[arg( short, long, - help = "Redis URL. If not suppiled, in memory cache backend will be used." + help = "Redis URL. If not suppiled, in memory cache backend will be used (example: redis://localhost:6379)." )] pub redis_url: Option, } diff --git a/src/cache/lru_backend.rs b/src/cache/lru_backend.rs index 0d3be59..aad5d87 100644 --- a/src/cache/lru_backend.rs +++ b/src/cache/lru_backend.rs @@ -36,6 +36,10 @@ pub struct LruBackend { } impl CacheBackend for LruBackend { + fn get_reorg_ttl(&self) -> u32 { + self.reorg_ttl + } + fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result { let key = format!("{method}:{params_key}"); diff --git a/src/cache/memory_backend.rs b/src/cache/memory_backend.rs index 3dd3894..e5fb0e4 100644 --- a/src/cache/memory_backend.rs +++ b/src/cache/memory_backend.rs @@ -35,6 +35,10 @@ pub struct MemoryBackend { } impl CacheBackend for MemoryBackend { + fn get_reorg_ttl(&self) -> u32 { + self.reorg_ttl + } + fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result { let key = format!("{method}:{params_key}"); diff --git a/src/cache/mod.rs b/src/cache/mod.rs index b722321..4b57150 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -6,6 +6,7 @@ use chrono::Local; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::cmp::min; +use tracing::info; pub enum CacheStatus { Cached { key: String, value: CacheValue }, @@ -23,6 +24,7 @@ pub struct CacheValue { impl CacheValue { pub fn new(data: Value, reorg_ttl: u32, ttl: u32) -> Self { let last_modified = Local::now().timestamp(); + let reorg_ttl = std::cmp::max(reorg_ttl, 1); // make sure nonzero Self { data, reorg_ttl, @@ -33,27 +35,14 @@ impl CacheValue { pub fn is_expired(&self) -> bool { let now = Local::now().timestamp(); - let last_modified = self.last_modified; - let reorg_ttl = self.reorg_ttl; - let ttl = self.ttl; - - if last_modified > now { - return true; - } - let age: u64 = (now - last_modified) as u64; - let ttl = if reorg_ttl == 0 && ttl == 0 { - 0 - } else if reorg_ttl == 0 && ttl > 0 { - ttl - } else if reorg_ttl > 0 && ttl == 0 { - reorg_ttl - } else { - min(reorg_ttl, ttl) - }; + let ttl = self.effective_ttl(); + age > ttl.into() + } - ttl != 0 && age > ttl.into() + pub fn effective_ttl(&self) -> u32 { + min(self.reorg_ttl, self.ttl) } pub fn update(mut self, expired_value: &Option, reorg_ttl: u32) -> Self { @@ -70,8 +59,7 @@ impl CacheValue { self.reorg_ttl = if is_new { reorg_ttl } else { - let now = Local::now().timestamp(); - let age: u64 = (now - expired_value.last_modified) as u64; + let age: u64 = (self.last_modified - expired_value.last_modified) as u64; if age > expired_value.reorg_ttl as u64 { expired_value.reorg_ttl * 2 } else { @@ -99,6 +87,7 @@ pub trait CacheBackendFactory: Send + Sync { } pub trait CacheBackend { + fn get_reorg_ttl(&self) -> u32; fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result; fn write( &mut self, diff --git a/src/cache/redis_backend.rs b/src/cache/redis_backend.rs index d51b3ff..a6b3291 100644 --- a/src/cache/redis_backend.rs +++ b/src/cache/redis_backend.rs @@ -37,6 +37,10 @@ pub struct RedisBackend { } impl CacheBackend for RedisBackend { + fn get_reorg_ttl(&self) -> u32 { + self.reorg_ttl + } + fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result { let cache_key = format!("{}:{method}:{params_key}", self.chain_id); let value: Option = self.conn.get(&cache_key)?; @@ -63,7 +67,10 @@ impl CacheBackend for RedisBackend { expired_value: &Option, ) -> anyhow::Result<()> { let cache_value = cache_value.update(expired_value, self.reorg_ttl); - let _ = self.conn.set::<_, _, String>(key, cache_value.to_string()?); + let redis_ttl = cache_value.effective_ttl() * 2; + let _ = self + .conn + .set_ex::<_, _, String>(key, cache_value.to_string()?, redis_ttl.into()); Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 2eda4dd..57abc35 100644 --- a/src/main.rs +++ b/src/main.rs @@ -311,23 +311,24 @@ async fn rpc_call( // made the early return. let handler = chain_state.handlers.get(&rpc_request.method).unwrap(); - let (is_cacheable, extracted_value) = match handler.extract_cache_value(result) { - Ok(v) => v, - Err(err) => { - metrics.error_counter.inc(); - tracing::error!("fail to extract cache value because: {}", err); + let (is_cacheable, extracted_value) = + match handler.extract_cache_value(result, cache_backend.get_reorg_ttl()) { + Ok(v) => v, + Err(err) => { + metrics.error_counter.inc(); + tracing::error!("fail to extract cache value because: {}", err); - ordered_requests_result[rpc_request.index] = Some(JsonRpcResponse::from_error( - Some(rpc_request.id.clone()), - DefinedError::InternalError(Some(json!({ - "error": "fail to extract cache value", - "reason": err.to_string(), - }))), - )); + ordered_requests_result[rpc_request.index] = Some(JsonRpcResponse::from_error( + Some(rpc_request.id.clone()), + DefinedError::InternalError(Some(json!({ + "error": "fail to extract cache value", + "reason": err.to_string(), + }))), + )); - continue; - } - }; + continue; + } + }; if is_cacheable { let _ = cache_backend.write(cache_key.as_str(), extracted_value, cache_value); @@ -482,8 +483,12 @@ impl HandlerEntry { self.inner.extract_cache_key(params) } - fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> { - self.inner.extract_cache_value(result) + fn extract_cache_value( + &self, + result: Value, + reorg_ttl: u32, + ) -> anyhow::Result<(bool, CacheValue)> { + self.inner.extract_cache_value(result, reorg_ttl) } } diff --git a/src/rpc_cache_handler/common.rs b/src/rpc_cache_handler/common.rs index 88d98fa..f92b737 100644 --- a/src/rpc_cache_handler/common.rs +++ b/src/rpc_cache_handler/common.rs @@ -51,10 +51,11 @@ pub fn extract_address_cache_key(params: &Value) -> anyhow::Result anyhow::Result<(bool, CacheValue)> { let is_cacheable = result.is_object() && !result["blockHash"].is_null(); - Ok((is_cacheable, CacheValue::new(result, 0, ttl))) + Ok((is_cacheable, CacheValue::new(result, reorg_ttl, ttl))) } pub fn extract_and_format_block_number(value: &Value) -> anyhow::Result> { diff --git a/src/rpc_cache_handler/eth_get_transaction_by_block_hash_and_index.rs b/src/rpc_cache_handler/eth_get_transaction_by_block_hash_and_index.rs index 5fcc7e4..222c029 100644 --- a/src/rpc_cache_handler/eth_get_transaction_by_block_hash_and_index.rs +++ b/src/rpc_cache_handler/eth_get_transaction_by_block_hash_and_index.rs @@ -23,8 +23,12 @@ impl RpcCacheHandler for Handler { Ok(Some(format!("{block_hash}-{tx_index}"))) } - fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> { - common::extract_transaction_cache_value(result, self.get_ttl()) + fn extract_cache_value( + &self, + result: Value, + reorg_ttl: u32, + ) -> anyhow::Result<(bool, CacheValue)> { + common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl()) } } diff --git a/src/rpc_cache_handler/eth_get_transaction_by_block_number_and_index.rs b/src/rpc_cache_handler/eth_get_transaction_by_block_number_and_index.rs index 2a81a09..93c6379 100644 --- a/src/rpc_cache_handler/eth_get_transaction_by_block_number_and_index.rs +++ b/src/rpc_cache_handler/eth_get_transaction_by_block_number_and_index.rs @@ -29,8 +29,12 @@ impl RpcCacheHandler for Handler { Ok(Some(format!("{block_number}-{tx_index}"))) } - fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> { - common::extract_transaction_cache_value(result, self.get_ttl()) + fn extract_cache_value( + &self, + result: Value, + reorg_ttl: u32, + ) -> anyhow::Result<(bool, CacheValue)> { + common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl()) } } diff --git a/src/rpc_cache_handler/eth_get_transaction_by_hash.rs b/src/rpc_cache_handler/eth_get_transaction_by_hash.rs index 139fb1d..25d988f 100644 --- a/src/rpc_cache_handler/eth_get_transaction_by_hash.rs +++ b/src/rpc_cache_handler/eth_get_transaction_by_hash.rs @@ -17,7 +17,11 @@ impl RpcCacheHandler for Handler { self.inner.extract_cache_key(params) } - fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> { - common::extract_transaction_cache_value(result, self.get_ttl()) + fn extract_cache_value( + &self, + result: Value, + reorg_ttl: u32, + ) -> anyhow::Result<(bool, CacheValue)> { + common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl()) } } diff --git a/src/rpc_cache_handler/eth_get_transaction_receipt.rs b/src/rpc_cache_handler/eth_get_transaction_receipt.rs index 50b1978..6ea4df7 100644 --- a/src/rpc_cache_handler/eth_get_transaction_receipt.rs +++ b/src/rpc_cache_handler/eth_get_transaction_receipt.rs @@ -21,8 +21,12 @@ impl RpcCacheHandler for Handler { Ok(Some(format!("{tx_hash:#x}"))) } - fn extract_cache_value(&self, result: Value) -> anyhow::Result<(bool, CacheValue)> { - common::extract_transaction_cache_value(result, self.get_ttl()) + fn extract_cache_value( + &self, + result: Value, + reorg_ttl: u32, + ) -> anyhow::Result<(bool, CacheValue)> { + common::extract_transaction_cache_value(result, reorg_ttl, self.get_ttl()) } } diff --git a/src/rpc_cache_handler/mod.rs b/src/rpc_cache_handler/mod.rs index ed17c4f..e87706d 100644 --- a/src/rpc_cache_handler/mod.rs +++ b/src/rpc_cache_handler/mod.rs @@ -31,16 +31,17 @@ pub trait RpcCacheHandler: Send + Sync { fn extract_cache_key(&self, params: &Value) -> Result>; - fn extract_cache_value(&self, result: Value) -> Result<(bool, CacheValue)> { + fn extract_cache_value(&self, result: Value, reorg_ttl: u32) -> Result<(bool, CacheValue)> { // reorg_ttl is managed by cache backend Ok(( !result.is_null(), - CacheValue::new(result, 0, self.get_ttl()), + CacheValue::new(result, reorg_ttl, self.get_ttl()), )) } + // default ttl is 1 day fn get_ttl(&self) -> u32 { - 0 + 86400 } }