Skip to content

Commit

Permalink
feat: track parquet cache metrics
Browse files Browse the repository at this point in the history
Adds metrics to track the following in the in-memory parquet cache:
* cache size in bytes (also included a fix in the calculation of that)
* cache size in n files
* cache hits
* cache misses
* cache misses while the oracle is fetching a file

A test was added to check this functionality
  • Loading branch information
hiltontj committed Dec 9, 2024
1 parent 40a11bd commit ce5c16e
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 5 deletions.
6 changes: 3 additions & 3 deletions influxdb3_cache/src/parquet_cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(super) struct AccessMetrics {
cache_misses_while_fetching: U64Counter,
}

const CACHE_ACCESS_NAME: &str = "parquet_cache_access";
pub(super) const CACHE_ACCESS_NAME: &str = "influxdb3_parquet_cache_access";

impl AccessMetrics {
pub(super) fn new(metric_registry: &Registry) -> Self {
Expand Down Expand Up @@ -44,8 +44,8 @@ pub(super) struct SizeMetrics {
cache_size_n_files: U64Gauge,
}

const CACHE_SIZE_MB_NAME: &str = "parquet_cache_size_bytes";
const CACHE_SIZE_N_FILES_NAME: &str = "parquet_cache_size_number_of_files";
pub(super) const CACHE_SIZE_MB_NAME: &str = "influxdb3_parquet_cache_size_bytes";
pub(super) const CACHE_SIZE_N_FILES_NAME: &str = "influxdb3_parquet_cache_size_number_of_files";

impl SizeMetrics {
pub(super) fn new(metric_registry: &Registry) -> Self {
Expand Down
178 changes: 176 additions & 2 deletions influxdb3_cache/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,13 @@ impl Cache {
// treated as immutable, this should be okay.
bail!("attempted to store value in non-fetching cache entry");
}
let current_size = entry.size();
entry.state = CacheEntryState::Success(value);
entry
.hit_time
.store(self.time_provider.now().timestamp_nanos(), Ordering::SeqCst);
// TODO(trevor): what if size is greater than cache capacity?
let additional_bytes = entry.size();
let additional_bytes = entry.size() - current_size;
self.size_metrics
.record_file_addition(additional_bytes as u64);
self.used.fetch_add(additional_bytes, Ordering::SeqCst);
Expand Down Expand Up @@ -761,13 +762,16 @@ pub(crate) mod tests {
RequestCountedObjectStore, SynchronizedObjectStore,
};
use iox_time::{MockProvider, Time, TimeProvider};
use metric::{Attributes, Metric, Registry, U64Counter, U64Gauge};
use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload};

use pretty_assertions::assert_eq;
use tokio::sync::Notify;

use crate::parquet_cache::{
create_cached_obj_store_and_oracle, test_cached_obj_store_and_oracle, CacheRequest,
create_cached_obj_store_and_oracle,
metrics::{CACHE_ACCESS_NAME, CACHE_SIZE_MB_NAME, CACHE_SIZE_N_FILES_NAME},
test_cached_obj_store_and_oracle, CacheRequest,
};

macro_rules! assert_payload_at_equals {
Expand Down Expand Up @@ -1003,4 +1007,174 @@ pub(crate) mod tests {
assert_payload_at_equals!(cached_store, payload, path);
assert_eq!(1, counter.total_read_request_count(&path));
}

struct MetricVerifier {
access_metrics: Metric<U64Counter>,
size_mb_metrics: Metric<U64Gauge>,
size_n_files_metrics: Metric<U64Gauge>,
}

impl MetricVerifier {
fn new(metric_registry: Arc<Registry>) -> Self {
let access_metrics = metric_registry
.get_instrument::<Metric<U64Counter>>(CACHE_ACCESS_NAME)
.unwrap();
let size_mb_metrics = metric_registry
.get_instrument::<Metric<U64Gauge>>(CACHE_SIZE_MB_NAME)
.unwrap();
let size_n_files_metrics = metric_registry
.get_instrument::<Metric<U64Gauge>>(CACHE_SIZE_N_FILES_NAME)
.unwrap();
Self {
access_metrics,
size_mb_metrics,
size_n_files_metrics,
}
}

fn assert_access(
&self,
hits_expected: u64,
misses_expected: u64,
misses_while_fetching_expected: u64,
) {
let hits_actual = self
.access_metrics
.get_observer(&Attributes::from(&[("status", "cached")]))
.unwrap()
.fetch();
let misses_actual = self
.access_metrics
.get_observer(&Attributes::from(&[("status", "miss")]))
.unwrap()
.fetch();
let misses_while_fetching_actual = self
.access_metrics
.get_observer(&Attributes::from(&[("status", "miss_while_fetching")]))
.unwrap()
.fetch();
assert_eq!(
hits_actual, hits_expected,
"cache hits did not match expectation"
);
assert_eq!(
misses_actual, misses_expected,
"cache misses did not match expectation"
);
assert_eq!(
misses_while_fetching_actual, misses_while_fetching_expected,
"cache misses while fetching did not match expectation"
);
}

fn assert_size(&self, size_bytes_expected: u64, size_n_files_expected: u64) {
let size_bytes_actual = self
.size_mb_metrics
.get_observer(&Attributes::from(&[]))
.unwrap()
.fetch();
let size_n_files_actual = self
.size_n_files_metrics
.get_observer(&Attributes::from(&[]))
.unwrap()
.fetch();
assert_eq!(
size_bytes_actual, size_bytes_expected,
"cache size in bytes did not match actual"
);
assert_eq!(
size_n_files_actual, size_n_files_expected,
"cache size in number of files did not match actual"
);
}
}

#[tokio::test]
async fn cache_metrics() {
// test setup
let to_store_notify = Arc::new(Notify::new());
let from_store_notify = Arc::new(Notify::new());
let counted_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
let inner_store = Arc::new(
SynchronizedObjectStore::new(Arc::clone(&counted_store) as _)
.with_get_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)),
);
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let metric_registry = Arc::new(Registry::new());
let (cached_store, oracle) = test_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&metric_registry),
);
let metric_verifier = MetricVerifier::new(metric_registry);

// put something in the object store:
let path = Path::from("0.parquet");
let payload = b"Janeway";
cached_store
.put(&path, PutPayload::from_static(payload))
.await
.unwrap();

// spin off a task to make a request to the object store on a separate thread. We will drive
// the notifiers from here, as we just need the request to go through to register a cache
// miss.
let cached_store_cloned = Arc::clone(&cached_store);
let path_cloned = path.clone();
let h = tokio::spawn(async move {
assert_payload_at_equals!(cached_store_cloned, payload, path_cloned);
});

// drive the synchronized store using the notifiers:
from_store_notify.notified().await;
to_store_notify.notify_one();
h.await.unwrap();

// check that there is a single cache miss:
metric_verifier.assert_access(0, 1, 0);
metric_verifier.assert_size(0, 0);

// there should be a single request made to the inner counted store from above:
assert_eq!(1, counted_store.total_read_request_count(&path));

// have the cache oracle cache the object:
let (cache_request, notifier_rx) = CacheRequest::create(path.clone());
oracle.register(cache_request);

// we are in the middle of a get request, i.e., the cache entry is "fetching" once this
// call to notified wakes:
let _ = from_store_notify.notified().await;

// spawn a thread to wake the in-flight get request initiated by the cache oracle after we
// have started a get request below, such that the get request below hits the cache while
// the entry is still in the "fetching" state:
let h = tokio::spawn(async move {
to_store_notify.notify_one();
let _ = notifier_rx.await;
});

// make the request to the store, which hits the cache in the "fetching" state since we
// haven't made the call to notify the store to continue yet:
assert_payload_at_equals!(cached_store, payload, path);

// check that there is a single miss while fetching, note, the metrics are cumulative, so
// the original miss is still there:
metric_verifier.assert_access(0, 1, 1);

// drive the task to completion to ensure that the cache request has been fulfilled:
h.await.unwrap();

// there should only have been two requests made, i.e., one from the request before the
// object was cached, and one from the cache oracle:
assert_eq!(2, counted_store.total_read_request_count(&path));

// make another request, this time, it should use the cache:
assert_payload_at_equals!(cached_store, payload, path);

metric_verifier.assert_access(1, 1, 1);
metric_verifier.assert_size(17, 1);

cached_store.delete(&path).await.unwrap();
metric_verifier.assert_size(0, 0);
}
}

0 comments on commit ce5c16e

Please sign in to comment.