diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index e97c253e7e..9742f5edd5 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -10,6 +10,7 @@ import scala.collection.mutable.ArrayBuffer import com.typesafe.scalalogging.StrictLogging import debox.Buffer import kamon.Kamon +import kamon.metric.MeasurementUnit import org.apache.commons.lang3.SystemUtils import org.apache.lucene.util.BytesRef import spire.implicits.cforRange @@ -45,6 +46,15 @@ class PartKeyTantivyIndex(ref: DatasetRef, deletedDocMergeThreshold: Float = 0.1f ) extends PartKeyIndexRaw(ref, shardNum, schema, diskLocation, lifecycleManager) { + private val cacheHitRate = Kamon.gauge("index-tantivy-cache-hit-rate") + .withTag("dataset", ref.dataset) + .withTag("shard", shardNum) + + private val refreshLatency = Kamon.histogram("index-tantivy-commit-refresh-latency", + MeasurementUnit.time.nanoseconds) + .withTag("dataset", ref.dataset) + .withTag("shard", shardNum) + // Compute field names for native schema code private val schemaFields = schema.columns.filter { c => c.columnType == StringColumn @@ -82,7 +92,19 @@ class PartKeyTantivyIndex(ref: DatasetRef, flushThreadPool = new ScheduledThreadPoolExecutor(1) - flushThreadPool.scheduleAtFixedRate(() => refreshReadersBlocking(), flushDelayMinSeconds, + flushThreadPool.scheduleAtFixedRate(() => { + // Commit / refresh + val start = System.nanoTime() + refreshReadersBlocking() + val elapsed = System.nanoTime() - start + refreshLatency.record(elapsed) + + // Emit cache stats + val cache_stats = TantivyNativeMethods.getCacheHitRates(indexHandle) + + cacheHitRate.withTag("label", "query").update(cache_stats(0)) + cacheHitRate.withTag("label", "column").update(cache_stats(1)) + }, flushDelayMinSeconds, flushDelayMinSeconds, TimeUnit.SECONDS) } @@ -663,6 +685,11 @@ protected object TantivyNativeMethods { @native def removePartitionsEndedBefore(handle: Long, endedBefore: Long, returnApproxDeletedCount: Boolean): Int + // Get cache hit rates for stats + // Array of (query cache, column cache) + @native + def getCacheHitRates(handle: Long): Array[Double] + // Dump stats - mainly meant for testing @native def dumpCacheStats(handle: Long): String diff --git a/core/src/rust/filodb_core/src/profile.rs b/core/src/rust/filodb_core/src/profile.rs index f69ef6d900..d3718586bd 100644 --- a/core/src/rust/filodb_core/src/profile.rs +++ b/core/src/rust/filodb_core/src/profile.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; use jni::{ objects::JClass, - sys::{jlong, jstring}, + sys::{jdoubleArray, jlong, jstring}, JNIEnv, }; @@ -40,6 +40,43 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_dump }) } +/// Get cache hit rates +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_getCacheHitRates( + mut env: JNIEnv, + _class: JClass, + handle: jlong, +) -> jdoubleArray { + jni_exec(&mut env, |env| { + let index = IndexHandle::get_ref_from_handle(handle); + + let (column_hits, column_misses) = index.column_cache.stats(); + let (query_hits, query_misses) = index.query_cache_stats(); + + let column_total = column_hits + column_misses; + let query_total = query_hits + query_misses; + let column_hit_rate = if column_total == 0 { + 1.0f64 + } else { + (column_hits as f64) / (column_total) as f64 + }; + + let query_hit_rate = if query_total == 0 { + 1.0f64 + } else { + (query_hits as f64) / (query_total) as f64 + }; + + // Contract with JVM code is (query hit rate, column hit rate) + let hit_rates = [query_hit_rate, column_hit_rate]; + + let result = env.new_double_array(hit_rates.len() as i32)?; + env.set_double_array_region(&result, 0, &hit_rates)?; + + Ok(result.into_raw()) + }) +} + /// Start memory profiling #[no_mangle] #[allow(unused_mut, unused_variables)]