diff --git a/Cargo.toml b/Cargo.toml index a892e9c..3c7d49c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ futures = "~0.3" log = "~0.4" moka = { version = "~0.12", features = ["future"] } num_cpus = "1.16" -object_store = "~0.11" +object_store = "=0.10.2" sysinfo = "~0.32" tokio = { version = "1", features = ["sync"] } diff --git a/src/lib.rs b/src/lib.rs index da1c0f9..959bfe4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ pub mod memory; pub mod paging; mod read_through; +pub mod stats; // We reuse `object_store` Error and Result to make this crate work well // with the rest of object_store implementations. diff --git a/src/read_through.rs b/src/read_through.rs index 6f8f4f4..a24a11e 100644 --- a/src/read_through.rs +++ b/src/read_through.rs @@ -9,7 +9,7 @@ use object_store::{ ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, }; -use crate::{paging::PageCache, Result}; +use crate::{paging::PageCache, stats::CacheStats, Result}; /// Read-through Page Cache. /// @@ -19,6 +19,8 @@ pub struct ReadThroughCache { cache: Arc, parallelism: usize, + + stats: Arc, } impl std::fmt::Display for ReadThroughCache { @@ -33,10 +35,23 @@ impl std::fmt::Display for ReadThroughCache { impl ReadThroughCache { pub fn new(inner: Arc, cache: Arc) -> Self { + Self::new_with_stats( + inner, + cache, + Arc::new(crate::stats::AtomicIntCacheStats::new()), + ) + } + + pub fn new_with_stats( + inner: Arc, + cache: Arc, + stats: Arc, + ) -> Self { Self { inner, cache, parallelism: num_cpus::get(), + stats, } } @@ -48,6 +63,7 @@ impl ReadThroughCache { async fn get_range( store: Arc, cache: Arc, + stats: Arc, location: &Path, range: Range, parallelism: usize, @@ -65,15 +81,17 @@ async fn get_range( let range_in_page = intersection.start - offset..intersection.end - offset; let page_end = std::cmp::min(offset + page_size, meta.size); let store = store.clone(); + let stats = stats.clone(); + + stats.inc_total_reads(); + async move { // Actual range in the file. page_cache - .get_range_with( - location, - page_id as u32, - range_in_page, - store.get_range(location, offset..page_end), - ) + .get_range_with(location, page_id as u32, range_in_page, async { + stats.inc_total_misses(); + store.get_range(location, offset..page_end).await + }) .await } }) @@ -81,6 +99,10 @@ async fn get_range( .try_collect::>() .await?; + if pages.len() == 1 { + return Ok(pages.into_iter().next().unwrap()); + } + // stick all bytes together. let mut buf = BytesMut::with_capacity(range.len()); for page in pages { @@ -122,24 +144,33 @@ impl ObjectStore for ReadThroughCache { let page_size = self.cache.page_size(); let inner = self.inner.clone(); let cache = self.cache.clone(); + let stats = self.stats.clone(); let location = location.clone(); let parallelism = self.parallelism; // TODO: This might yield too many small reads. - let s = - stream::iter((0..file_size).step_by(page_size)) - .map(move |offset| { - let loc = location.clone(); - let store = inner.clone(); - let c = cache.clone(); - let page_size = cache.page_size(); - - async move { - get_range(store, c, &loc, offset..offset + page_size, parallelism).await - } - }) - .buffered(self.parallelism) - .boxed(); + let s = stream::iter((0..file_size).step_by(page_size)) + .map(move |offset| { + let loc = location.clone(); + let store = inner.clone(); + let stats = stats.clone(); + let c = cache.clone(); + let page_size = cache.page_size(); + + async move { + get_range( + store, + c, + stats, + &loc, + offset..offset + page_size, + parallelism, + ) + .await + } + }) + .buffered(self.parallelism) + .boxed(); let payload = GetResultPayload::Stream(s); Ok(GetResult { @@ -154,6 +185,7 @@ impl ObjectStore for ReadThroughCache { get_range( self.inner.clone(), self.cache.clone(), + self.stats.clone(), location, range, self.parallelism, diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 0000000..459ec5c --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,124 @@ +//! Cache stats + +use std::{ + fmt::Debug, + sync::atomic::{AtomicU64, Ordering}, +}; + +use log::warn; + +pub trait CacheReadStats: Sync + Send + Debug { + /// Total reads on the cache. + fn total_reads(&self) -> u64; + + /// Total hits + fn total_misses(&self) -> u64; + + /// Increase total reads by 1. + fn inc_total_reads(&self); + + /// Increase total hits by 1. + fn inc_total_misses(&self); +} + +pub trait CacheCapacityStats { + fn max_capacity(&self) -> u64; + + fn set_max_capacity(&self, val: u64); + + fn usage(&self) -> u64; + + fn set_usage(&self, val: u64); + + fn inc_usage(&self, val: u64); + + fn sub_usage(&self, val: u64); +} + +pub trait CacheStats: CacheCapacityStats + CacheReadStats {} + +#[derive(Debug)] +pub struct AtomicIntCacheStats { + total_reads: AtomicU64, + total_misses: AtomicU64, + max_capacity: AtomicU64, + capacity_usage: AtomicU64, +} + +impl AtomicIntCacheStats { + pub fn new() -> Self { + Self { + total_misses: AtomicU64::new(0), + total_reads: AtomicU64::new(0), + max_capacity: AtomicU64::new(0), + capacity_usage: AtomicU64::new(0), + } + } +} + +impl Default for AtomicIntCacheStats { + fn default() -> Self { + Self::new() + } +} + +impl CacheReadStats for AtomicIntCacheStats { + fn total_misses(&self) -> u64 { + self.total_misses.load(Ordering::Acquire) + } + + fn total_reads(&self) -> u64 { + self.total_reads.load(Ordering::Acquire) + } + + fn inc_total_reads(&self) { + self.total_reads.fetch_add(1, Ordering::Relaxed); + } + + fn inc_total_misses(&self) { + self.total_misses.fetch_add(1, Ordering::Relaxed); + } +} + +impl CacheCapacityStats for AtomicIntCacheStats { + fn max_capacity(&self) -> u64 { + self.max_capacity.load(Ordering::Acquire) + } + + fn set_max_capacity(&self, val: u64) { + self.max_capacity.store(val, Ordering::Relaxed); + } + + fn usage(&self) -> u64 { + self.capacity_usage.load(Ordering::Acquire) + } + + fn set_usage(&self, val: u64) { + self.capacity_usage.store(val, Ordering::Relaxed); + } + + fn inc_usage(&self, val: u64) { + self.capacity_usage.fetch_add(val, Ordering::Relaxed); + } + + fn sub_usage(&self, val: u64) { + let res = + self.capacity_usage + .fetch_update(Ordering::Acquire, Ordering::Relaxed, |current| { + if current < val { + warn!( + "cannot decrement cache usage. current val = {:?} and decrement = {:?}", + current, val + ); + None + } else { + Some(current - val) + } + }); + if let Err(e) = res { + warn!("error setting cache usage: {:?}", e); + } + } +} + +impl CacheStats for AtomicIntCacheStats {}