From d2ce949194dbc1357ba1ffa202e9904be1f160b3 Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Fri, 6 Sep 2024 13:17:33 -0700 Subject: [PATCH] feat(core): Add tantivy metrics Add two new metrics for commit processing time and cache hit rates. These will be used for monitoring as we roll out and turn the feature on for testing. --- .../memstore/PartKeyTantivyIndex.scala | 29 +++++++++++++- core/src/rust/filodb_core/src/profile.rs | 39 ++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index e97c253e7e..9742f5edd5 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -10,6 +10,7 @@ import scala.collection.mutable.ArrayBuffer import com.typesafe.scalalogging.StrictLogging import debox.Buffer import kamon.Kamon +import kamon.metric.MeasurementUnit import org.apache.commons.lang3.SystemUtils import org.apache.lucene.util.BytesRef import spire.implicits.cforRange @@ -45,6 +46,15 @@ class PartKeyTantivyIndex(ref: DatasetRef, deletedDocMergeThreshold: Float = 0.1f ) extends PartKeyIndexRaw(ref, shardNum, schema, diskLocation, lifecycleManager) { + private val cacheHitRate = Kamon.gauge("index-tantivy-cache-hit-rate") + .withTag("dataset", ref.dataset) + .withTag("shard", shardNum) + + private val refreshLatency = Kamon.histogram("index-tantivy-commit-refresh-latency", + MeasurementUnit.time.nanoseconds) + .withTag("dataset", ref.dataset) + .withTag("shard", shardNum) + // Compute field names for native schema code private val schemaFields = schema.columns.filter { c => c.columnType == StringColumn @@ -82,7 +92,19 @@ class PartKeyTantivyIndex(ref: DatasetRef, flushThreadPool = new ScheduledThreadPoolExecutor(1) - flushThreadPool.scheduleAtFixedRate(() => refreshReadersBlocking(), flushDelayMinSeconds, + flushThreadPool.scheduleAtFixedRate(() => { + // Commit / refresh + val start = System.nanoTime() + refreshReadersBlocking() + val elapsed = System.nanoTime() - start + refreshLatency.record(elapsed) + + // Emit cache stats + val cache_stats = TantivyNativeMethods.getCacheHitRates(indexHandle) + + cacheHitRate.withTag("label", "query").update(cache_stats(0)) + cacheHitRate.withTag("label", "column").update(cache_stats(1)) + }, flushDelayMinSeconds, flushDelayMinSeconds, TimeUnit.SECONDS) } @@ -663,6 +685,11 @@ protected object TantivyNativeMethods { @native def removePartitionsEndedBefore(handle: Long, endedBefore: Long, returnApproxDeletedCount: Boolean): Int + // Get cache hit rates for stats + // Array of (query cache, column cache) + @native + def getCacheHitRates(handle: Long): Array[Double] + // Dump stats - mainly meant for testing @native def dumpCacheStats(handle: Long): String diff --git a/core/src/rust/filodb_core/src/profile.rs b/core/src/rust/filodb_core/src/profile.rs index f69ef6d900..d3718586bd 100644 --- a/core/src/rust/filodb_core/src/profile.rs +++ b/core/src/rust/filodb_core/src/profile.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; use jni::{ objects::JClass, - sys::{jlong, jstring}, + sys::{jdoubleArray, jlong, jstring}, JNIEnv, }; @@ -40,6 +40,43 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_dump }) } +/// Get cache hit rates +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_getCacheHitRates( + mut env: JNIEnv, + _class: JClass, + handle: jlong, +) -> jdoubleArray { + jni_exec(&mut env, |env| { + let index = IndexHandle::get_ref_from_handle(handle); + + let (column_hits, column_misses) = index.column_cache.stats(); + let (query_hits, query_misses) = index.query_cache_stats(); + + let column_total = column_hits + column_misses; + let query_total = query_hits + query_misses; + let column_hit_rate = if column_total == 0 { + 1.0f64 + } else { + (column_hits as f64) / (column_total) as f64 + }; + + let query_hit_rate = if query_total == 0 { + 1.0f64 + } else { + (query_hits as f64) / (query_total) as f64 + }; + + // Contract with JVM code is (query hit rate, column hit rate) + let hit_rates = [query_hit_rate, column_hit_rate]; + + let result = env.new_double_array(hit_rates.len() as i32)?; + env.set_double_array_region(&result, 0, &hit_rates)?; + + Ok(result.into_raw()) + }) +} + /// Start memory profiling #[no_mangle] #[allow(unused_mut, unused_variables)]