Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add trait for cache stats and atomic stats #5

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures = "~0.3"
log = "~0.4"
moka = { version = "~0.12", features = ["future"] }
num_cpus = "1.16"
object_store = "~0.11"
object_store = "=0.10.2"
sysinfo = "~0.32"
tokio = { version = "1", features = ["sync"] }

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
pub mod memory;
pub mod paging;
mod read_through;
pub mod stats;

// We reuse `object_store` Error and Result to make this crate work well
// with the rest of object_store implementations.
Expand Down
74 changes: 53 additions & 21 deletions src/read_through.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use object_store::{
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
};

use crate::{paging::PageCache, Result};
use crate::{paging::PageCache, stats::CacheStats, Result};

/// Read-through Page Cache.
///
Expand All @@ -19,6 +19,8 @@ pub struct ReadThroughCache<C: PageCache> {
cache: Arc<C>,

parallelism: usize,

stats: Arc<dyn CacheStats>,
}

impl<C: PageCache> std::fmt::Display for ReadThroughCache<C> {
Expand All @@ -33,10 +35,23 @@ impl<C: PageCache> std::fmt::Display for ReadThroughCache<C> {

impl<C: PageCache> ReadThroughCache<C> {
pub fn new(inner: Arc<dyn ObjectStore>, cache: Arc<C>) -> Self {
Self::new_with_stats(
inner,
cache,
Arc::new(crate::stats::AtomicIntCacheStats::new()),
)
}

pub fn new_with_stats(
inner: Arc<dyn ObjectStore>,
cache: Arc<C>,
stats: Arc<dyn CacheStats>,
) -> Self {
Self {
inner,
cache,
parallelism: num_cpus::get(),
stats,
}
}

Expand All @@ -48,6 +63,7 @@ impl<C: PageCache> ReadThroughCache<C> {
async fn get_range<C: PageCache>(
store: Arc<dyn ObjectStore>,
cache: Arc<C>,
stats: Arc<dyn CacheStats>,
location: &Path,
range: Range<usize>,
parallelism: usize,
Expand All @@ -65,22 +81,28 @@ async fn get_range<C: PageCache>(
let range_in_page = intersection.start - offset..intersection.end - offset;
let page_end = std::cmp::min(offset + page_size, meta.size);
let store = store.clone();
let stats = stats.clone();

stats.inc_total_reads();

async move {
// Actual range in the file.
page_cache
.get_range_with(
location,
page_id as u32,
range_in_page,
store.get_range(location, offset..page_end),
)
.get_range_with(location, page_id as u32, range_in_page, async {
stats.inc_total_misses();
store.get_range(location, offset..page_end).await
})
.await
}
})
.buffered(parallelism)
.try_collect::<Vec<_>>()
.await?;

if pages.len() == 1 {
return Ok(pages.into_iter().next().unwrap());
}

// stick all bytes together.
let mut buf = BytesMut::with_capacity(range.len());
for page in pages {
Expand Down Expand Up @@ -122,24 +144,33 @@ impl<C: PageCache> ObjectStore for ReadThroughCache<C> {
let page_size = self.cache.page_size();
let inner = self.inner.clone();
let cache = self.cache.clone();
let stats = self.stats.clone();
let location = location.clone();
let parallelism = self.parallelism;

// TODO: This might yield too many small reads.
let s =
stream::iter((0..file_size).step_by(page_size))
.map(move |offset| {
let loc = location.clone();
let store = inner.clone();
let c = cache.clone();
let page_size = cache.page_size();

async move {
get_range(store, c, &loc, offset..offset + page_size, parallelism).await
}
})
.buffered(self.parallelism)
.boxed();
let s = stream::iter((0..file_size).step_by(page_size))
.map(move |offset| {
let loc = location.clone();
let store = inner.clone();
let stats = stats.clone();
let c = cache.clone();
let page_size = cache.page_size();

async move {
get_range(
store,
c,
stats,
&loc,
offset..offset + page_size,
parallelism,
)
.await
}
})
.buffered(self.parallelism)
.boxed();

let payload = GetResultPayload::Stream(s);
Ok(GetResult {
Expand All @@ -154,6 +185,7 @@ impl<C: PageCache> ObjectStore for ReadThroughCache<C> {
get_range(
self.inner.clone(),
self.cache.clone(),
self.stats.clone(),
location,
range,
self.parallelism,
Expand Down
124 changes: 124 additions & 0 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Cache stats

use std::{
fmt::Debug,
sync::atomic::{AtomicU64, Ordering},
};

use log::warn;

pub trait CacheReadStats: Sync + Send + Debug {
/// Total reads on the cache.
fn total_reads(&self) -> u64;

/// Total hits
fn total_misses(&self) -> u64;

/// Increase total reads by 1.
fn inc_total_reads(&self);

/// Increase total hits by 1.
fn inc_total_misses(&self);
}

pub trait CacheCapacityStats {
fn max_capacity(&self) -> u64;

fn set_max_capacity(&self, val: u64);

fn usage(&self) -> u64;

fn set_usage(&self, val: u64);

fn inc_usage(&self, val: u64);

fn sub_usage(&self, val: u64);
}

pub trait CacheStats: CacheCapacityStats + CacheReadStats {}

#[derive(Debug)]
pub struct AtomicIntCacheStats {
total_reads: AtomicU64,
total_misses: AtomicU64,
max_capacity: AtomicU64,
capacity_usage: AtomicU64,
}

impl AtomicIntCacheStats {
pub fn new() -> Self {
Self {
total_misses: AtomicU64::new(0),
total_reads: AtomicU64::new(0),
max_capacity: AtomicU64::new(0),
capacity_usage: AtomicU64::new(0),
}
}
}

impl Default for AtomicIntCacheStats {
fn default() -> Self {
Self::new()
}
}

impl CacheReadStats for AtomicIntCacheStats {
fn total_misses(&self) -> u64 {
self.total_misses.load(Ordering::Acquire)
}

fn total_reads(&self) -> u64 {
self.total_reads.load(Ordering::Acquire)
}

fn inc_total_reads(&self) {
self.total_reads.fetch_add(1, Ordering::Relaxed);
}

fn inc_total_misses(&self) {
self.total_misses.fetch_add(1, Ordering::Relaxed);
}
}

impl CacheCapacityStats for AtomicIntCacheStats {
fn max_capacity(&self) -> u64 {
self.max_capacity.load(Ordering::Acquire)
}

fn set_max_capacity(&self, val: u64) {
self.max_capacity.store(val, Ordering::Relaxed);
}

fn usage(&self) -> u64 {
self.capacity_usage.load(Ordering::Acquire)
}

fn set_usage(&self, val: u64) {
self.capacity_usage.store(val, Ordering::Relaxed);
}

fn inc_usage(&self, val: u64) {
self.capacity_usage.fetch_add(val, Ordering::Relaxed);
}

fn sub_usage(&self, val: u64) {
let res =
self.capacity_usage
.fetch_update(Ordering::Acquire, Ordering::Relaxed, |current| {
if current < val {
warn!(
"cannot decrement cache usage. current val = {:?} and decrement = {:?}",
current, val
);
None
} else {
Some(current - val)
}
});
if let Err(e) = res {
warn!("error setting cache usage: {:?}", e);
}
}
}

impl CacheStats for AtomicIntCacheStats {}