diff --git a/.github/template/template.yml b/.github/template/template.yml index 007f9d48..cd70d88f 100644 --- a/.github/template/template.yml +++ b/.github/template/template.yml @@ -106,6 +106,7 @@ jobs: run: | cargo clippy --all-targets --features tokio-console -- -D warnings cargo clippy --all-targets --features deadlock -- -D warnings + cargo clippy --all-targets --features mtrace -- -D warnings cargo clippy --all-targets -- -D warnings - if: steps.cache.outputs.cache-hit != 'true' uses: taiki-e/install-action@cargo-llvm-cov diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 887f6b82..fd623823 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -112,6 +112,7 @@ jobs: run: | cargo clippy --all-targets --features tokio-console -- -D warnings cargo clippy --all-targets --features deadlock -- -D warnings + cargo clippy --all-targets --features mtrace -- -D warnings cargo clippy --all-targets -- -D warnings - if: steps.cache.outputs.cache-hit != 'true' uses: taiki-e/install-action@cargo-llvm-cov diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 1172e0c4..d4d5c7d3 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -111,6 +111,7 @@ jobs: run: | cargo clippy --all-targets --features tokio-console -- -D warnings cargo clippy --all-targets --features deadlock -- -D warnings + cargo clippy --all-targets --features mtrace -- -D warnings cargo clippy --all-targets -- -D warnings - if: steps.cache.outputs.cache-hit != 'true' uses: taiki-e/install-action@cargo-llvm-cov diff --git a/.gitignore b/.gitignore index a048def5..e53cc65c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ docker-compose.override.yaml .tmp perf.data* -flamegraph.svg \ No newline at end of file +flamegraph.svg + +trace.txt \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 2ffad619..a08c6864 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ test-log = { version = "0.2", default-features = false, features = [ ] } itertools = "0.13" metrics = "0.23" +minitrace = "0.6" +minitrace-jaeger = "0.6" [profile.release] debug = true diff --git a/Makefile b/Makefile index a3d034c9..f9a22153 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,7 @@ check-all: cargo clippy --all-targets --features tokio-console cargo clippy --all-targets --features trace cargo clippy --all-targets --features sanity + cargo clippy --all-targets --features mtrace cargo clippy --all-targets test: diff --git a/foyer-bench/Cargo.toml b/foyer-bench/Cargo.toml index e50f02b1..1e330345 100644 --- a/foyer-bench/Cargo.toml +++ b/foyer-bench/Cargo.toml @@ -21,6 +21,8 @@ hdrhistogram = "7" itertools = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = "0.15" +minitrace = { workspace = true, optional = true } +minitrace-jaeger = { workspace = true, optional = true } opentelemetry = { version = "0.23", optional = true } opentelemetry-otlp = { version = "0.16", optional = true } opentelemetry-semantic-conventions = { version = "0.15", optional = true } @@ -54,3 +56,4 @@ trace = [ strict_assertions = ["foyer/strict_assertions"] sanity = ["foyer/sanity"] jemalloc = ["tikv-jemallocator"] +mtrace = ["foyer/mtrace", "minitrace-jaeger", "minitrace"] diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index 24b1234c..29b28fd9 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -19,7 +19,7 @@ mod text; use bytesize::MIB; use foyer::{ DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache, HybridCacheBuilder, InvalidRatioPicker, - LfuConfig, LruConfig, RateLimitPicker, RuntimeConfigBuilder, S3FifoConfig, + LfuConfig, LruConfig, RateLimitPicker, RuntimeConfigBuilder, S3FifoConfig, TraceConfig, }; use metrics_exporter_prometheus::PrometheusBuilder; @@ -29,7 +29,7 @@ use std::{ net::SocketAddr, ops::{Deref, Range}, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, @@ -179,6 +179,22 @@ pub struct Args { #[arg(long, default_value = "lfu")] eviction: String, + + /// Record insert trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000 * 1000)] + trace_insert_ns: usize, + /// Record get trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000 * 1000)] + trace_get_ns: usize, + /// Record obtain trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000 * 1000)] + trace_obtain_ns: usize, + /// Record remove trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000 * 1000)] + trace_remove_ns: usize, + /// Record fetch trace threshold. Only effective with "mtrace" feature. + #[arg(long, default_value_t = 1000 * 1000 * 1000)] + trace_fetch_ns: usize, } #[derive(Debug)] @@ -265,12 +281,12 @@ mod arc_bytes { } #[cfg(feature = "tokio-console")] -fn init_logger() { +fn setup() { console_subscriber::init(); } #[cfg(feature = "trace")] -fn init_logger() { +fn setup() { use opentelemetry_sdk::{ trace::{BatchConfigBuilder, Config}, Resource, @@ -309,8 +325,18 @@ fn init_logger() { .init(); } -#[cfg(not(any(feature = "tokio-console", feature = "trace")))] -fn init_logger() { +#[cfg(feature = "mtrace")] +fn setup() { + use minitrace::collector::Config; + let reporter = minitrace_jaeger::JaegerReporter::new("127.0.0.1:6831".parse().unwrap(), "foyer-bench").unwrap(); + minitrace::set_reporter( + reporter, + Config::default().batch_report_interval(Duration::from_millis(1)), + ); +} + +#[cfg(not(any(feature = "tokio-console", feature = "trace", feature = "mtrace")))] +fn setup() { use tracing_subscriber::{prelude::*, EnvFilter}; tracing_subscriber::registry() @@ -323,12 +349,22 @@ fn init_logger() { .init(); } +#[cfg(not(any(feature = "mtrace")))] +fn teardown() {} + +#[cfg(feature = "mtrace")] +fn teardown() { + minitrace::flush(); +} + #[tokio::main] async fn main() { - init_logger(); + setup(); #[cfg(all(feature = "jemalloc", not(target_env = "msvc")))] - tracing::info!("[foyer bench]: jemalloc is enabled."); + { + tracing::info!("[foyer bench]: jemalloc is enabled."); + } #[cfg(feature = "deadlock")] { @@ -367,7 +403,16 @@ async fn main() { create_dir_all(&args.dir).unwrap(); + let trace_config = TraceConfig { + record_hybrid_insert_threshold_ns: AtomicUsize::new(args.trace_insert_ns), + record_hybrid_get_threshold_ns: AtomicUsize::new(args.trace_get_ns), + record_hybrid_obtain_threshold_ns: AtomicUsize::new(args.trace_obtain_ns), + record_hybrid_remove_threshold_ns: AtomicUsize::new(args.trace_remove_ns), + record_hybrid_fetch_threshold_ns: AtomicUsize::new(args.trace_fetch_ns), + }; + let builder = HybridCacheBuilder::new() + .with_trace_config(trace_config) .memory(args.mem * MIB as usize) .with_shards(args.shards); @@ -473,6 +518,8 @@ async fn main() { handle_signal.abort(); println!("\nTotal:\n{}", analysis); + + teardown(); } async fn bench(args: Args, hybrid: HybridCache, metrics: Metrics, stop_tx: broadcast::Sender<()>) { diff --git a/foyer-common/src/lib.rs b/foyer-common/src/lib.rs index 6bae2c18..9d70cd91 100644 --- a/foyer-common/src/lib.rs +++ b/foyer-common/src/lib.rs @@ -45,6 +45,8 @@ pub mod rate; pub mod rated_ticket; /// A runtime that automatically shutdown itself on drop. pub mod runtime; +/// Trace related components. +pub mod trace; /// File system utils. #[cfg(any(target_os = "linux", target_os = "macos"))] diff --git a/foyer-common/src/trace.rs b/foyer-common/src/trace.rs new file mode 100644 index 00000000..6d88eed0 --- /dev/null +++ b/foyer-common/src/trace.rs @@ -0,0 +1,42 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicUsize; + +/// Configurations for trace. +#[derive(Debug)] +pub struct TraceConfig { + /// Threshold for recording the hybrid cache `insert` and `insert_with_context` operation in ns. + pub record_hybrid_insert_threshold_ns: AtomicUsize, + /// Threshold for recording the hybrid cache `get` operation in ns. + pub record_hybrid_get_threshold_ns: AtomicUsize, + /// Threshold for recording the hybrid cache `obtain` operation in ns. + pub record_hybrid_obtain_threshold_ns: AtomicUsize, + /// Threshold for recording the hybrid cache `remove` operation in ns. + pub record_hybrid_remove_threshold_ns: AtomicUsize, + /// Threshold for recording the hybrid cache `fetch` operation in ns. + pub record_hybrid_fetch_threshold_ns: AtomicUsize, +} + +impl Default for TraceConfig { + fn default() -> Self { + Self { + record_hybrid_insert_threshold_ns: AtomicUsize::from(1000 * 1000 * 1000), + record_hybrid_get_threshold_ns: AtomicUsize::from(1000 * 1000 * 1000), + record_hybrid_obtain_threshold_ns: AtomicUsize::from(1000 * 1000 * 1000), + record_hybrid_remove_threshold_ns: AtomicUsize::from(1000 * 1000 * 1000), + record_hybrid_fetch_threshold_ns: AtomicUsize::from(1000 * 1000 * 1000), + } + } +} diff --git a/foyer-memory/Cargo.toml b/foyer-memory/Cargo.toml index fcf4a0d4..ffc95618 100644 --- a/foyer-memory/Cargo.toml +++ b/foyer-memory/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.3" hashbrown = "0.14" itertools = { workspace = true } libc = "0.2" +minitrace = { workspace = true } parking_lot = "0.12" serde = { workspace = true } tokio = { workspace = true } @@ -40,6 +41,7 @@ strict_assertions = [ "foyer-intrusive/strict_assertions", ] sanity = ["strict_assertions"] +mtrace = ["minitrace/enable"] [[bench]] name = "bench_hit_ratio" diff --git a/foyer-memory/src/cache.rs b/foyer-memory/src/cache.rs index 1f570146..9065f44c 100644 --- a/foyer-memory/src/cache.rs +++ b/foyer-memory/src/cache.rs @@ -501,6 +501,7 @@ where S: HashBuilder, { /// Insert cache entry to the in-memory cache. + #[minitrace::trace(short_name = true)] pub fn insert(&self, key: K, value: V) -> CacheEntry { match self { Cache::Fifo(cache) => cache.insert(key, value).into(), @@ -511,6 +512,7 @@ where } /// Insert cache entry with cache context to the in-memory cache. + #[minitrace::trace(short_name = true)] pub fn insert_with_context(&self, key: K, value: V, context: CacheContext) -> CacheEntry { match self { Cache::Fifo(cache) => cache.insert_with_context(key, value, context).into(), @@ -525,6 +527,7 @@ where /// The entry will be removed as soon as the returned entry is dropped. /// /// The entry will become a normal entry after it is accessed. + #[minitrace::trace(short_name = true)] pub fn deposit(&self, key: K, value: V) -> CacheEntry { match self { Cache::Fifo(cache) => cache.deposit(key, value).into(), @@ -539,6 +542,7 @@ where /// The entry will be removed as soon as the returned entry is dropped. /// /// The entry will become a normal entry after it is accessed. + #[minitrace::trace(short_name = true)] pub fn deposit_with_context(&self, key: K, value: V, context: CacheContext) -> CacheEntry { match self { Cache::Fifo(cache) => cache.deposit_with_context(key, value, context).into(), @@ -549,6 +553,7 @@ where } /// Remove a cached entry with the given key from the in-memory cache. + #[minitrace::trace(short_name = true)] pub fn remove(&self, key: &Q) -> Option> where K: Borrow, @@ -563,6 +568,7 @@ where } /// Get cached entry with the given key from the in-memory cache. + #[minitrace::trace(short_name = true)] pub fn get(&self, key: &Q) -> Option> where K: Borrow, @@ -577,6 +583,7 @@ where } /// Check if the in-memory cache contains a cached entry with the given key. + #[minitrace::trace(short_name = true)] pub fn contains(&self, key: &Q) -> bool where K: Borrow, @@ -593,6 +600,7 @@ where /// Access the cached entry with the given key but don't return. /// /// Note: This method can be used to update the cache eviction information and order based on the algorithm. + #[minitrace::trace(short_name = true)] pub fn touch(&self, key: &Q) -> bool where K: Borrow, @@ -607,6 +615,7 @@ where } /// Clear the in-memory cache. + #[minitrace::trace(short_name = true)] pub fn clear(&self) { match self { Cache::Fifo(cache) => cache.clear(), @@ -778,6 +787,7 @@ where /// Use `fetch` to fetch the cache value from the remote storage on cache miss. /// /// The concurrent fetch requests will be deduplicated. + #[minitrace::trace(short_name = true)] pub fn fetch(&self, key: K, fetch: F) -> Fetch where F: FnOnce() -> FU, diff --git a/foyer-memory/src/generic.rs b/foyer-memory/src/generic.rs index 6e593bc6..228cb308 100644 --- a/foyer-memory/src/generic.rs +++ b/foyer-memory/src/generic.rs @@ -46,6 +46,8 @@ use crate::{ CacheContext, }; +use minitrace::prelude::*; + // TODO(MrCroxx): Use `trait_alias` after stable. /// The weighter for the in-memory cache. /// @@ -748,7 +750,7 @@ where } let cache = self.clone(); - let future = fetch(); + let future = fetch().in_span(Span::enter_with_local_parent("spawned")); let join = runtime.spawn(async move { let (value, context) = match future.await { Ok((value, context)) => (value, context), diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index d7cc8839..4fcc7aec 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -29,6 +29,7 @@ itertools = { workspace = true } lazy_static = "1" libc = "0.2" lz4 = "1.24" +minitrace = { workspace = true } parking_lot = { version = "0.12", features = ["arc_lock"] } pin-project = "1" rand = "0.8" @@ -51,3 +52,4 @@ strict_assertions = [ "foyer-common/strict_assertions", "foyer-memory/strict_assertions", ] +mtrace = ["minitrace/enable", "foyer-memory/mtrace"] diff --git a/foyer-storage/src/device/monitor.rs b/foyer-storage/src/device/monitor.rs index 3f91da42..b7c624ca 100644 --- a/foyer-storage/src/device/monitor.rs +++ b/foyer-storage/src/device/monitor.rs @@ -84,21 +84,11 @@ where metrics: Arc, } -impl Device for Monitored +impl Monitored where D: Device, { - type Options = MonitoredOptions; - - fn capacity(&self) -> usize { - self.device.capacity() - } - - fn region_size(&self) -> usize { - self.device.region_size() - } - - async fn open(options: Self::Options) -> Result { + async fn open(options: MonitoredOptions) -> Result { let device = D::open(options.options).await?; Ok(Self { device, @@ -107,6 +97,7 @@ where }) } + #[minitrace::trace(short_name = true)] async fn write(&self, buf: IoBuffer, region: RegionId, offset: u64) -> Result<()> { let now = Instant::now(); @@ -123,6 +114,7 @@ where res } + #[minitrace::trace(short_name = true)] async fn read(&self, region: RegionId, offset: u64, len: usize) -> Result { let now = Instant::now(); @@ -139,6 +131,7 @@ where res } + #[minitrace::trace(short_name = true)] async fn flush(&self, region: Option) -> Result<()> { let now = Instant::now(); @@ -153,7 +146,39 @@ where } } +impl Device for Monitored +where + D: Device, +{ + type Options = MonitoredOptions; + + fn capacity(&self) -> usize { + self.device.capacity() + } + + fn region_size(&self) -> usize { + self.device.region_size() + } + + async fn open(options: Self::Options) -> Result { + Self::open(options).await + } + + async fn write(&self, buf: IoBuffer, region: RegionId, offset: u64) -> Result<()> { + self.write(buf, region, offset).await + } + + async fn read(&self, region: RegionId, offset: u64, len: usize) -> Result { + self.read(region, offset, len).await + } + + async fn flush(&self, region: Option) -> Result<()> { + self.flush(region).await + } +} + impl Monitored { + #[minitrace::trace(short_name = true)] pub async fn pwrite(&self, buf: IoBuffer, offset: u64) -> Result<()> { let now = Instant::now(); @@ -170,6 +195,7 @@ impl Monitored { res } + #[minitrace::trace(short_name = true)] pub async fn pread(&self, offset: u64, len: usize) -> Result { let now = Instant::now(); diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index f70dca3d..d5171ff4 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -349,6 +349,7 @@ where future } + #[minitrace::trace(short_name = true)] fn load(&self, key: &Q) -> impl Future>> + Send + 'static where K: Borrow, diff --git a/foyer-storage/src/store/mod.rs b/foyer-storage/src/store/mod.rs index 7d8e9705..8b8de79d 100644 --- a/foyer-storage/src/store/mod.rs +++ b/foyer-storage/src/store/mod.rs @@ -175,6 +175,7 @@ where } } + #[minitrace::trace(short_name = true)] fn load(&self, key: &Q) -> impl Future>> + Send + 'static where Self::Key: Borrow, diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 97bbc31f..604fbe14 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1" foyer-common = { version = "0.7.3", path = "../foyer-common" } foyer-memory = { version = "0.5.2", path = "../foyer-memory" } foyer-storage = { version = "0.8.5", path = "../foyer-storage" } +minitrace = { workspace = true } tokio = { workspace = true } tracing = "0.1" @@ -34,3 +35,4 @@ strict_assertions = [ "foyer-storage/strict_assertions", ] sanity = ["strict_assertions", "foyer-memory/sanity"] +mtrace = ["minitrace/enable", "foyer-memory/mtrace", "foyer-storage/mtrace"] diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index 74fcfd71..c905cffc 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -18,6 +18,7 @@ use ahash::RandomState; use foyer_common::{ code::{HashBuilder, StorageKey, StorageValue}, event::EventListener, + trace::TraceConfig, }; use foyer_memory::{Cache, CacheBuilder, EvictionConfig, Weighter}; use foyer_storage::{ @@ -31,6 +32,7 @@ use crate::HybridCache; pub struct HybridCacheBuilder { name: String, event_listener: Option>>, + trace_config: TraceConfig, } impl Default for HybridCacheBuilder { @@ -45,6 +47,7 @@ impl HybridCacheBuilder { Self { name: "foyer".to_string(), event_listener: None, + trace_config: TraceConfig::default(), } } @@ -66,6 +69,14 @@ impl HybridCacheBuilder { self } + /// Set trace config. + /// + /// Default: Only operations over 1s will be recorded. + pub fn with_trace_config(mut self, trace_config: TraceConfig) -> Self { + self.trace_config = trace_config; + self + } + /// Continue to modify the in-memory cache configurations. pub fn memory(self, capacity: usize) -> HybridCacheBuilderPhaseMemory where @@ -79,6 +90,7 @@ impl HybridCacheBuilder { HybridCacheBuilderPhaseMemory { builder, name: self.name, + trace_config: self.trace_config, } } } @@ -91,6 +103,7 @@ where S: HashBuilder + Debug, { name: String, + trace_config: TraceConfig, builder: CacheBuilder, } @@ -106,6 +119,7 @@ where let builder = self.builder.with_shards(shards); HybridCacheBuilderPhaseMemory { name: self.name, + trace_config: self.trace_config, builder, } } @@ -117,6 +131,7 @@ where let builder = self.builder.with_eviction_config(eviction_config.into()); HybridCacheBuilderPhaseMemory { name: self.name, + trace_config: self.trace_config, builder, } } @@ -130,6 +145,7 @@ where let builder = self.builder.with_object_pool_capacity(object_pool_capacity); HybridCacheBuilderPhaseMemory { name: self.name, + trace_config: self.trace_config, builder, } } @@ -142,6 +158,7 @@ where let builder = self.builder.with_hash_builder(hash_builder); HybridCacheBuilderPhaseMemory { name: self.name, + trace_config: self.trace_config, builder, } } @@ -151,6 +168,7 @@ where let builder = self.builder.with_weighter(weighter); HybridCacheBuilderPhaseMemory { name: self.name, + trace_config: self.trace_config, builder, } } @@ -161,6 +179,7 @@ where HybridCacheBuilderPhaseStorage { builder: StoreBuilder::new(memory.clone()).with_name(&self.name), name: self.name, + trace_config: self.trace_config, memory, } } @@ -174,6 +193,7 @@ where S: HashBuilder + Debug, { name: String, + trace_config: TraceConfig, memory: Cache, builder: StoreBuilder, } @@ -189,6 +209,7 @@ where let builder = self.builder.with_device_config(device_config); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -201,6 +222,7 @@ where let builder = self.builder.with_flush(flush); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -213,6 +235,7 @@ where let builder = self.builder.with_indexer_shards(indexer_shards); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -227,6 +250,7 @@ where let builder = self.builder.with_recover_mode(recover_mode); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -239,6 +263,7 @@ where let builder = self.builder.with_recover_concurrency(recover_concurrency); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -253,6 +278,7 @@ where let builder = self.builder.with_flushers(flushers); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -267,6 +293,7 @@ where let builder = self.builder.with_reclaimers(reclaimers); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -281,6 +308,7 @@ where let builder = self.builder.with_clean_region_threshold(clean_region_threshold); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -300,6 +328,7 @@ where let builder = self.builder.with_eviction_pickers(eviction_pickers); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -314,6 +343,7 @@ where let builder = self.builder.with_admission_picker(admission_picker); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -332,6 +362,7 @@ where let builder = self.builder.with_reinsertion_picker(reinsertion_picker); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -344,6 +375,7 @@ where let builder = self.builder.with_compression(compression); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -357,6 +389,7 @@ where let builder = self.builder.with_tombstone_log_config(tombstone_log_config); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -367,6 +400,7 @@ where let builder = self.builder.with_runtime_config(runtime_config); Self { name: self.name, + trace_config: self.trace_config, memory: self.memory, builder, } @@ -375,6 +409,6 @@ where /// Build and open the hybrid cache with the given configurations. pub async fn build(self) -> anyhow::Result> { let storage = self.builder.build().await?; - Ok(HybridCache::new(self.name, self.memory, storage)) + Ok(HybridCache::new(self.name, self.memory, storage, self.trace_config)) } } diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index e85a6402..c54fc3bd 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -12,21 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{borrow::Borrow, fmt::Debug, future::Future, hash::Hash, sync::Arc, time::Instant}; +use std::{ + borrow::Borrow, + fmt::Debug, + future::Future, + hash::Hash, + sync::{atomic::Ordering, Arc}, + time::Instant, +}; use ahash::RandomState; use foyer_common::{ code::{HashBuilder, StorageKey, StorageValue}, metrics::Metrics, + trace::TraceConfig, }; use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState}; use foyer_storage::{DeviceStats, Storage, Store}; +use minitrace::prelude::*; use tokio::sync::oneshot; use crate::HybridCacheWriter; use super::writer::HybridCacheStorageWriter; +macro_rules! try_cancel { + ($self:ident, $span:ident, $threshold:ident) => { + if let Some(elapsed) = $span.elapsed() { + if (elapsed.as_nanos() as usize) < $self.trace_config.$threshold.load(Ordering::Relaxed) { + $span.cancel(); + } + } + }; +} + /// A cached entry holder of the hybrid cache. pub type HybridCacheEntry = CacheEntry; @@ -40,6 +59,7 @@ where memory: Cache, storage: Store, metrics: Arc, + trace_config: Arc, } impl Debug for HybridCache @@ -52,6 +72,7 @@ where f.debug_struct("HybridCache") .field("memory", &self.memory) .field("storage", &self.storage) + .field("trace_config", &self.trace_config) .finish() } } @@ -67,6 +88,7 @@ where memory: self.memory.clone(), storage: self.storage.clone(), metrics: self.metrics.clone(), + trace_config: self.trace_config.clone(), } } } @@ -77,15 +99,27 @@ where V: StorageValue, S: HashBuilder + Debug, { - pub(crate) fn new(name: String, memory: Cache, storage: Store) -> Self { + pub(crate) fn new( + name: String, + memory: Cache, + storage: Store, + trace_config: TraceConfig, + ) -> Self { let metrics = Arc::new(Metrics::new(&name)); + let trace_config = Arc::new(trace_config); Self { memory, storage, metrics, + trace_config, } } + /// Access the trace config. + pub fn trace_config(&self) -> &TraceConfig { + &self.trace_config + } + /// Access the in-memory cache. pub fn memory(&self) -> &Cache { &self.memory @@ -93,6 +127,9 @@ where /// Insert cache entry to the hybrid cache. pub fn insert(&self, key: K, value: V) -> HybridCacheEntry { + let mut span = Span::root(func_name!(), SpanContext::random()); + let _guard = span.set_local_parent(); + let now = Instant::now(); let entry = self.memory.insert(key, value); @@ -101,11 +138,16 @@ where self.metrics.hybrid_insert.increment(1); self.metrics.hybrid_insert_duration.record(now.elapsed()); + try_cancel!(self, span, record_hybrid_insert_threshold_ns); + entry } /// Insert cache entry with cache context to the hybrid cache. pub fn insert_with_context(&self, key: K, value: V, context: CacheContext) -> HybridCacheEntry { + let mut span = Span::root(func_name!(), SpanContext::random()); + let _guard = span.set_local_parent(); + let now = Instant::now(); let entry = self.memory.insert_with_context(key, value, context); @@ -114,6 +156,8 @@ where self.metrics.hybrid_insert.increment(1); self.metrics.hybrid_insert_duration.record(now.elapsed()); + try_cancel!(self, span, record_hybrid_insert_threshold_ns); + entry } @@ -123,6 +167,8 @@ where K: Borrow, Q: Hash + Eq + ?Sized + Send + Sync + 'static + Clone, { + let mut span = Span::root(func_name!(), SpanContext::random()); + let now = Instant::now(); let record_hit = || { @@ -134,20 +180,30 @@ where self.metrics.hybrid_miss_duration.record(now.elapsed()); }; + let guard = span.set_local_parent(); if let Some(entry) = self.memory.get(key) { record_hit(); + try_cancel!(self, span, record_hybrid_get_threshold_ns); return Ok(Some(entry)); } - - if let Some((k, v)) = self.storage.load(key).await? { + drop(guard); + + if let Some((k, v)) = self + .storage + .load(key) + .in_span(Span::enter_with_parent("poll", &span)) + .await? + { if k.borrow() != key { record_miss(); return Ok(None); } record_hit(); + try_cancel!(self, span, record_hybrid_get_threshold_ns); return Ok(Some(self.memory.insert(k, v))); } record_miss(); + try_cancel!(self, span, record_hybrid_get_threshold_ns); Ok(None) } @@ -161,36 +217,47 @@ where where K: Clone, { + let mut span = Span::root(func_name!(), SpanContext::random()); + let now = Instant::now(); - let res = self - .memory - .fetch(key.clone(), || { - let store = self.storage.clone(); - async move { - match store.load(&key).await.map_err(anyhow::Error::from) { - Err(e) => Err(ObtainFetchError::Err(e)), - Ok(None) => Err(ObtainFetchError::NotExist), - Ok(Some((k, _))) if key != k => Err(ObtainFetchError::NotExist), - Ok(Some((_, v))) => Ok((v, CacheContext::default())), - } + let guard = span.set_local_parent(); + let fetch = self.memory.fetch(key.clone(), || { + let store = self.storage.clone(); + async move { + match store.load(&key).await.map_err(anyhow::Error::from) { + Err(e) => Err(ObtainFetchError::Err(e)), + Ok(None) => Err(ObtainFetchError::NotExist), + Ok(Some((k, _))) if key != k => Err(ObtainFetchError::NotExist), + Ok(Some((_, v))) => Ok((v, CacheContext::default())), } - }) - .await; + } + }); + drop(guard); + + let res = fetch.in_span(Span::enter_with_parent("poll", &span)).await; match res { Ok(entry) => { self.metrics.hybrid_hit.increment(1); self.metrics.hybrid_hit_duration.record(now.elapsed()); + try_cancel!(self, span, record_hybrid_obtain_threshold_ns); Ok(Some(entry)) } Err(ObtainFetchError::NotExist) => { self.metrics.hybrid_miss.increment(1); self.metrics.hybrid_miss_duration.record(now.elapsed()); + try_cancel!(self, span, record_hybrid_obtain_threshold_ns); Ok(None) } - Err(ObtainFetchError::RecvError(_)) => Ok(None), - Err(ObtainFetchError::Err(e)) => Err(e), + Err(ObtainFetchError::RecvError(_)) => { + try_cancel!(self, span, record_hybrid_obtain_threshold_ns); + Ok(None) + } + Err(ObtainFetchError::Err(e)) => { + try_cancel!(self, span, record_hybrid_obtain_threshold_ns); + Err(e) + } } } @@ -200,6 +267,9 @@ where K: Borrow, Q: Hash + Eq + ?Sized + Send + Sync + 'static, { + let mut span = Span::root(func_name!(), SpanContext::random()); + let _guard = span.set_local_parent(); + let now = Instant::now(); self.memory.remove(key); @@ -207,6 +277,8 @@ where self.metrics.hybrid_remove.increment(1); self.metrics.hybrid_remove_duration.record(now.elapsed()); + + try_cancel!(self, span, record_hybrid_remove_threshold_ns); } /// Check if the hybrid cache contains a cached entry with the given key. @@ -290,6 +362,12 @@ where F: FnOnce() -> FU, FU: Future> + Send + 'static, { + let span = if SpanContext::current_local_parent().is_none() { + Some(Span::root(func_name!(), SpanContext::random())) + } else { + None + }; + let now = Instant::now(); let store = self.storage.clone(); @@ -324,6 +402,10 @@ where self.metrics.hybrid_hit_duration.record(now.elapsed()); } + if let Some(mut span) = span { + try_cancel!(self, span, record_hybrid_fetch_threshold_ns); + } + ret } } diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index 8ff51aaf..6e0d5f46 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -23,6 +23,7 @@ pub use common::{ code::{Key, StorageKey, StorageValue, Value}, event::EventListener, range::RangeBoundsExt, + trace::TraceConfig, }; pub use memory::{CacheContext, EvictionConfig, FetchState, FifoConfig, LfuConfig, LruConfig, S3FifoConfig}; pub use storage::{