Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add tantivy metrics #1845

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging
import debox.Buffer
import kamon.Kamon
import kamon.metric.MeasurementUnit
import org.apache.commons.lang3.SystemUtils
import org.apache.lucene.util.BytesRef
import spire.implicits.cforRange
Expand Down Expand Up @@ -45,6 +46,15 @@ class PartKeyTantivyIndex(ref: DatasetRef,
deletedDocMergeThreshold: Float = 0.1f
) extends PartKeyIndexRaw(ref, shardNum, schema, diskLocation, lifecycleManager) {

private val cacheHitRate = Kamon.gauge("index-tantivy-cache-hit-rate")
.withTag("dataset", ref.dataset)
.withTag("shard", shardNum)

private val refreshLatency = Kamon.histogram("index-tantivy-commit-refresh-latency",
MeasurementUnit.time.nanoseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shardNum)

// Compute field names for native schema code
private val schemaFields = schema.columns.filter { c =>
c.columnType == StringColumn
Expand Down Expand Up @@ -82,7 +92,19 @@ class PartKeyTantivyIndex(ref: DatasetRef,

flushThreadPool = new ScheduledThreadPoolExecutor(1)

flushThreadPool.scheduleAtFixedRate(() => refreshReadersBlocking(), flushDelayMinSeconds,
flushThreadPool.scheduleAtFixedRate(() => {
// Commit / refresh
val start = System.nanoTime()
refreshReadersBlocking()
val elapsed = System.nanoTime() - start
refreshLatency.record(elapsed)

// Emit cache stats
val cache_stats = TantivyNativeMethods.getCacheHitRates(indexHandle)

cacheHitRate.withTag("label", "query").update(cache_stats(0))
cacheHitRate.withTag("label", "column").update(cache_stats(1))
}, flushDelayMinSeconds,
flushDelayMinSeconds, TimeUnit.SECONDS)
}

Expand Down Expand Up @@ -663,6 +685,11 @@ protected object TantivyNativeMethods {
@native
def removePartitionsEndedBefore(handle: Long, endedBefore: Long, returnApproxDeletedCount: Boolean): Int

// Get cache hit rates for stats
// Array of (query cache, column cache)
@native
def getCacheHitRates(handle: Long): Array[Double]

// Dump stats - mainly meant for testing
@native
def dumpCacheStats(handle: Long): String
Expand Down
39 changes: 38 additions & 1 deletion core/src/rust/filodb_core/src/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Mutex;

use jni::{
objects::JClass,
sys::{jlong, jstring},
sys::{jdoubleArray, jlong, jstring},
JNIEnv,
};

Expand Down Expand Up @@ -40,6 +40,43 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_dump
})
}

/// Get cache hit rates
#[no_mangle]
pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_getCacheHitRates(
mut env: JNIEnv,
_class: JClass,
handle: jlong,
) -> jdoubleArray {
jni_exec(&mut env, |env| {
let index = IndexHandle::get_ref_from_handle(handle);

let (column_hits, column_misses) = index.column_cache.stats();
let (query_hits, query_misses) = index.query_cache_stats();

let column_total = column_hits + column_misses;
let query_total = query_hits + query_misses;
let column_hit_rate = if column_total == 0 {
1.0f64
} else {
(column_hits as f64) / (column_total) as f64
};

let query_hit_rate = if query_total == 0 {
1.0f64
} else {
(query_hits as f64) / (query_total) as f64
};

// Contract with JVM code is (query hit rate, column hit rate)
let hit_rates = [query_hit_rate, column_hit_rate];

let result = env.new_double_array(hit_rates.len() as i32)?;
env.set_double_array_region(&result, 0, &hit_rates)?;

Ok(result.into_raw())
})
}

/// Start memory profiling
#[no_mangle]
#[allow(unused_mut, unused_variables)]
Expand Down
Loading