diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index fdb3c851fa..8b24c0caa7 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -2,6 +2,7 @@ package filodb.core.memstore import java.io.File import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit} @@ -22,7 +23,7 @@ import filodb.core.memstore.PartKeyIndexRaw.{bytesRefToUnsafeOffset, ignoreIndex import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} import filodb.core.query.{ColumnFilter, Filter} -import filodb.memory.format.UnsafeUtils +import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String} object PartKeyTantivyIndex { def startMemoryProfiling(): Unit = { @@ -155,7 +156,7 @@ class PartKeyTantivyIndex(ref: DatasetRef, } override def indexNames(limit: Int): Seq[String] = { - TantivyNativeMethods.indexNames(indexHandle).filterNot { + decodeStringArray(TantivyNativeMethods.indexNames(indexHandle)).filterNot { n => ignoreIndexNames.contains(n) || n.startsWith(FACET_FIELD_PREFIX) } } @@ -163,7 +164,38 @@ class PartKeyTantivyIndex(ref: DatasetRef, override def indexValues(fieldName: String, topK: Int): Seq[TermInfo] = { val results = TantivyNativeMethods.indexValues(indexHandle, fieldName, topK) - results.toSeq + val buffer = ByteBuffer.wrap(results) + buffer.order(ByteOrder.LITTLE_ENDIAN) + + val parsedResults = new ArrayBuffer[TermInfo]() + + while (buffer.hasRemaining) { + val count = buffer.getLong + val strLen = buffer.getInt + val strBytes = new Array[Byte](strLen) + buffer.get(strBytes) + + parsedResults += TermInfo(ZeroCopyUTF8String.apply(strBytes), count.toInt) + } + + parsedResults + } + + private def decodeStringArray(arr: Array[Byte]): Seq[String] = { + val buffer = ByteBuffer.wrap(arr) + buffer.order(ByteOrder.LITTLE_ENDIAN) + + val parsedResults = new ArrayBuffer[String]() + + while (buffer.hasRemaining) { + val strLen = buffer.getInt + val strBytes = new Array[Byte](strLen) + buffer.get(strBytes) + + parsedResults += new String(strBytes, StandardCharsets.UTF_8) + } + + parsedResults } override def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String] = { @@ -176,7 +208,7 @@ class PartKeyTantivyIndex(ref: DatasetRef, labelValuesQueryLatency.record(System.nanoTime() - start) - results.toSeq + decodeStringArray(results) } override def labelValuesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long, @@ -189,7 +221,7 @@ class PartKeyTantivyIndex(ref: DatasetRef, labelValuesQueryLatency.record(System.nanoTime() - start) - results.toSeq + decodeStringArray(results) } override def addPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long, @@ -645,19 +677,19 @@ protected object TantivyNativeMethods { // Get the list of unique indexed field names @native - def indexNames(handle: Long): Array[String] + def indexNames(handle: Long): Array[Byte] // Get the list of unique values for a field @native - def indexValues(handle: Long, fieldName: String, topK: Int): Array[TermInfo] + def indexValues(handle: Long, fieldName: String, topK: Int): Array[Byte] // Get the list of unique indexed field names @native - def labelNames(handle: Long, query: Array[Byte], limit: Int, start: Long, end: Long): Array[String] + def labelNames(handle: Long, query: Array[Byte], limit: Int, start: Long, end: Long): Array[Byte] // Get the list of unique values for a field @native - def labelValues(handle: Long, query: Array[Byte], colName: String, limit: Int, start: Long, end: Long): Array[String] + def labelValues(handle: Long, query: Array[Byte], colName: String, limit: Int, start: Long, end: Long): Array[Byte] // Get the list of part IDs given a query @native diff --git a/core/src/rust/filodb_core/src/reader.rs b/core/src/rust/filodb_core/src/reader.rs index c8fb97d9be..72f814a00b 100644 --- a/core/src/rust/filodb_core/src/reader.rs +++ b/core/src/rust/filodb_core/src/reader.rs @@ -4,17 +4,17 @@ use std::sync::atomic::Ordering; use hashbrown::HashSet; use jni::{ - objects::{JByteArray, JClass, JIntArray, JObject, JString, JValue}, - sys::{jbyteArray, jint, jintArray, jlong, jlongArray, jobjectArray}, + objects::{JByteArray, JClass, JIntArray, JObject, JString}, + sys::{jbyteArray, jint, jintArray, jlong, jlongArray}, JNIEnv, }; -use tantivy::{collector::FacetCollector, schema::FieldType}; -use tantivy_utils::collectors::part_key_record_collector::PartKeyRecordCollector; +use tantivy::schema::FieldType; +use tantivy_utils::collectors::part_id_collector::PartIdCollector; use tantivy_utils::collectors::string_field_collector::StringFieldCollector; use tantivy_utils::collectors::time_collector::TimeCollector; use tantivy_utils::collectors::time_range_filter::TimeRangeFilter; use tantivy_utils::collectors::{ - limited_collector::UnlimitedCollector, part_id_collector::PartIdCollector, + index_collector::collect_from_index, part_key_record_collector::PartKeyRecordCollector, }; use tantivy_utils::collectors::{ part_key_collector::PartKeyCollector, part_key_record_collector::PartKeyRecord, @@ -29,9 +29,6 @@ use crate::{ state::IndexHandle, }; -const TERM_INFO_CLASS: &str = "filodb/core/memstore/TermInfo"; -const UTF8STR_CLASS: &str = "filodb/memory/format/ZeroCopyUTF8String"; - #[no_mangle] pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexRamBytes( mut env: JNIEnv, @@ -147,21 +144,28 @@ fn fetch_label_names( query: FiloDBQuery, handle: &IndexHandle, results: &mut HashSet, + limit: i32, start: i64, end: i64, ) -> JavaResult<()> { - // Get LABEL_NAMES facet - let mut collector = FacetCollector::for_field(facet_field_name(field_constants::LABEL_LIST)); - collector.add_facet("/"); - let collector = UnlimitedCollector::new(collector); - - let filter_collector = - TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); + let field = facet_field_name(field_constants::LABEL_LIST); + let collector = StringFieldCollector::new( + &field, + limit as usize, + usize::MAX, + handle.column_cache.clone(), + ); + + let query_results = if matches!(query, FiloDBQuery::All) { + collect_from_index(&handle.searcher(), collector)? + } else { + let filter_collector = + TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); + handle.execute_cachable_query(query, filter_collector)? + }; - let query_results = handle.execute_cachable_query(query, filter_collector)?; - let query_results: Vec<_> = query_results.get("/").collect(); for (facet, _count) in query_results { - results.insert(facet.to_path()[0].to_string()); + results.insert(facet.to_string()); } Ok(()) @@ -176,7 +180,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe limit: jint, start: jlong, end: jlong, -) -> jobjectArray { +) -> jbyteArray { jni_exec(&mut env, |env| { let handle = IndexHandle::get_ref_from_handle(handle); @@ -185,25 +189,41 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe let query_bytes = env.get_byte_array(&query)?; let query = FiloDBQuery::Complex(query_bytes.into_boxed_slice().into()); - fetch_label_names(query, handle, &mut results, start, end)?; + fetch_label_names(query, handle, &mut results, limit, start, end)?; - let len = std::cmp::min(results.len(), limit as usize); + encode_string_array(env, results) + }) +} - let java_ret = env.new_object_array(len as i32, "java/lang/String", JObject::null())?; - for (i, item) in results.drain().take(len).enumerate() { - env.set_object_array_element(&java_ret, i as i32, env.new_string(item)?)?; - } +fn encode_string_array(env: &mut JNIEnv, arr: HashSet) -> JavaResult { + let len: usize = arr + .iter() + .map(|s| std::mem::size_of::() + s.len()) + .sum(); - Ok(java_ret.into_raw()) - }) + let mut serialzied_bytes = Vec::with_capacity(len); + for s in arr.iter() { + serialzied_bytes.extend((s.len() as i32).to_le_bytes()); + serialzied_bytes.extend(s.as_bytes()); + } + + let java_ret = env.new_byte_array(len as i32)?; + let bytes_ptr = serialzied_bytes.as_ptr() as *const i8; + let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) }; + + env.set_byte_array_region(&java_ret, 0, bytes_ptr)?; + + Ok(java_ret.into_raw()) } +const LABEL_NAMES_AND_VALUES_DEFAULT_LIMIT: i32 = 100; + #[no_mangle] pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexNames( mut env: JNIEnv, _class: JClass, handle: jlong, -) -> jobjectArray { +) -> jbyteArray { jni_exec(&mut env, |env| { let handle = IndexHandle::get_ref_from_handle(handle); @@ -223,15 +243,16 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde } let query = FiloDBQuery::All; - fetch_label_names(query, handle, &mut results, 0, i64::MAX)?; - - let java_ret = - env.new_object_array(results.len() as i32, "java/lang/String", JObject::null())?; - for (i, item) in results.drain().enumerate() { - env.set_object_array_element(&java_ret, i as i32, env.new_string(item)?)?; - } + fetch_label_names( + query, + handle, + &mut results, + LABEL_NAMES_AND_VALUES_DEFAULT_LIMIT, + 0, + i64::MAX, + )?; - Ok(java_ret.into_raw()) + encode_string_array(env, results) }) } @@ -260,9 +281,14 @@ fn query_label_values( let collector = StringFieldCollector::new(&field, limit, term_limit, handle.column_cache.clone()); - let filter_collector = - TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); - Ok(handle.execute_cachable_query(query, filter_collector)?) + + if matches!(query, FiloDBQuery::All) { + Ok(collect_from_index(&handle.searcher(), collector)?) + } else { + let filter_collector = + TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); + Ok(handle.execute_cachable_query(query, filter_collector)?) + } } else { // Invalid field, no values Ok(vec![]) @@ -279,7 +305,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe top_k: jint, start: jlong, end: jlong, -) -> jobjectArray { +) -> jbyteArray { jni_exec(&mut env, |env| { let handle = IndexHandle::get_ref_from_handle(handle); @@ -293,14 +319,23 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe let results = query_label_values(query, handle, field, top_k, usize::MAX, start, end)?; - let results_len = std::cmp::min(top_k, results.len()); - let java_ret = - env.new_object_array(results_len as i32, "java/lang/String", JObject::null())?; - for (i, (value, _)) in results.into_iter().take(top_k).enumerate() { - let term_str = env.new_string(&value)?; - env.set_object_array_element(&java_ret, i as i32, &term_str)?; + let len: usize = results + .iter() + .map(|(s, _)| std::mem::size_of::() + s.len()) + .sum(); + + let mut serialzied_bytes = Vec::with_capacity(len); + for (s, _) in results.iter() { + serialzied_bytes.extend((s.len() as i32).to_le_bytes()); + serialzied_bytes.extend(s.as_bytes()); } + let java_ret = env.new_byte_array(len as i32)?; + let bytes_ptr = serialzied_bytes.as_ptr() as *const i8; + let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) }; + + env.set_byte_array_region(&java_ret, 0, bytes_ptr)?; + Ok(java_ret.into_raw()) }) } @@ -312,7 +347,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde handle: jlong, field: JString, top_k: jint, -) -> jobjectArray { +) -> jbyteArray { jni_exec(&mut env, |env| { let handle = IndexHandle::get_ref_from_handle(handle); @@ -331,34 +366,25 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde i64::MAX, )?; - let results_len = std::cmp::min(top_k, results.len()); - let java_ret = - env.new_object_array(results_len as i32, TERM_INFO_CLASS, JObject::null())?; - for (i, (value, count)) in results.into_iter().take(top_k).enumerate() { - let len = value.as_bytes().len(); - let term_bytes = env.new_byte_array(len as i32)?; - let bytes_ptr = value.as_bytes().as_ptr() as *const i8; - let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) }; - - env.set_byte_array_region(&term_bytes, 0, bytes_ptr)?; - - let term_str = env - .call_static_method( - UTF8STR_CLASS, - "apply", - "([B)Lfilodb/memory/format/ZeroCopyUTF8String;", - &[JValue::Object(&term_bytes)], - )? - .l()?; - - let term_info_obj = env.new_object( - TERM_INFO_CLASS, - "(Lfilodb/memory/format/ZeroCopyUTF8String;I)V", - &[JValue::Object(&term_str), JValue::Int(count as i32)], - )?; - env.set_object_array_element(&java_ret, i as i32, &term_info_obj)?; + // String length, plus count, plus string data + let results_len: usize = results + .iter() + .take(top_k) + .map(|(value, _)| value.len() + std::mem::size_of::() + std::mem::size_of::()) + .sum(); + let mut serialzied_bytes = Vec::with_capacity(results_len); + for (value, count) in results.into_iter().take(top_k) { + serialzied_bytes.extend(count.to_le_bytes()); + serialzied_bytes.extend((value.len() as i32).to_le_bytes()); + serialzied_bytes.extend(value.as_bytes()); } + let java_ret = env.new_byte_array(results_len as i32)?; + let bytes_ptr = serialzied_bytes.as_ptr() as *const i8; + let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, results_len) }; + + env.set_byte_array_region(&java_ret, 0, bytes_ptr)?; + Ok(java_ret.into_raw()) }) } diff --git a/core/src/rust/tantivy_utils/src/collectors.rs b/core/src/rust/tantivy_utils/src/collectors.rs index ae48b64862..2fa1ab07a3 100644 --- a/core/src/rust/tantivy_utils/src/collectors.rs +++ b/core/src/rust/tantivy_utils/src/collectors.rs @@ -1,6 +1,7 @@ //! Common collectors pub mod column_cache; +pub mod index_collector; pub mod limited_collector; pub mod part_id_collector; pub mod part_key_collector; diff --git a/core/src/rust/tantivy_utils/src/collectors/index_collector.rs b/core/src/rust/tantivy_utils/src/collectors/index_collector.rs new file mode 100644 index 0000000000..33ae4a73ff --- /dev/null +++ b/core/src/rust/tantivy_utils/src/collectors/index_collector.rs @@ -0,0 +1,43 @@ +//! Collector that can run over an entire index without a query +//! + +use tantivy::{collector::SegmentCollector, Searcher, SegmentReader, TantivyError}; + +use super::limited_collector::{LimitCounter, LimitedCollector, LimitedSegmentCollector}; + +/// Index Segment collector +pub trait IndexCollector: LimitedCollector +where + Self::Child: LimitedSegmentCollector, +{ + /// Colllect data across an entire index segment + fn collect_over_index( + &self, + reader: &SegmentReader, + limiter: &mut LimitCounter, + ) -> Result<::Fruit, TantivyError>; +} + +pub fn collect_from_index(searcher: &Searcher, collector: C) -> Result +where + C: IndexCollector, + C::Child: LimitedSegmentCollector, +{ + let segment_readers = searcher.segment_readers(); + let mut fruits: Vec<::Fruit> = + Vec::with_capacity(segment_readers.len()); + + let mut limiter = LimitCounter::new(collector.limit()); + + for segment_reader in segment_readers.iter() { + let results = collector.collect_over_index(segment_reader, &mut limiter)?; + + fruits.push(results); + + if limiter.at_limit() { + break; + } + } + + collector.merge_fruits(fruits) +} diff --git a/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs b/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs index 0b050c50f2..f0ee817a8e 100644 --- a/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs +++ b/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs @@ -1,5 +1,6 @@ //! Collector to string values from a document +use core::str; use std::collections::hash_map::Entry; use hashbrown::HashMap; @@ -7,12 +8,16 @@ use nohash_hasher::IntMap; use tantivy::{ collector::{Collector, SegmentCollector}, columnar::StrColumn, + TantivyError, }; use crate::collectors::column_cache::ColumnCache; -use super::limited_collector::{ - LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector, +use super::{ + index_collector::IndexCollector, + limited_collector::{ + LimitCounter, LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector, + }, }; pub struct StringFieldCollector<'a> { @@ -66,7 +71,11 @@ impl<'a> Collector for StringFieldCollector<'a> { &self, segment_fruits: Vec>, ) -> tantivy::Result> { - let mut results = HashMap::new(); + let mut results = if self.limit < usize::MAX { + HashMap::with_capacity(self.limit) + } else { + HashMap::new() + }; for mut map in segment_fruits.into_iter() { for (value, count) in map.drain() { @@ -144,16 +153,89 @@ impl SegmentCollector for StringFieldSegmentCollector { } } +impl<'a> IndexCollector for StringFieldCollector<'a> { + fn collect_over_index( + &self, + reader: &tantivy::SegmentReader, + limiter: &mut LimitCounter, + ) -> Result<::Fruit, tantivy::TantivyError> { + let Some((field, prefix)) = reader.schema().find_field(self.field) else { + return Err(TantivyError::FieldNotFound(self.field.to_string())); + }; + + let mut ret = if self.limit < usize::MAX { + HashMap::with_capacity(self.limit) + } else { + HashMap::new() + }; + + if limiter.at_limit() { + return Ok(ret); + } + + let index_reader = reader.inverted_index(field)?; + let mut index_reader = index_reader.terms().range(); + if !prefix.is_empty() { + // Only look at prefix range + index_reader = index_reader.ge(format!("{}\0", prefix)); + index_reader = index_reader.lt(format!("{}\u{001}", prefix)); + } + let mut index_reader = index_reader.into_stream()?; + while !limiter.at_limit() && index_reader.advance() { + let mut key_bytes = index_reader.key(); + if !prefix.is_empty() { + // Skip prefix + key_bytes = &key_bytes[prefix.len() + 2..]; + } + if key_bytes.is_empty() { + continue; + } + + let key = str::from_utf8(key_bytes) + .map_err(|e| TantivyError::InternalError(e.to_string()))?; + + // capture it + ret.insert(key.to_string(), index_reader.value().doc_freq as u64); + + // No need to check error, the check at the top of the while will handle it + let _ = limiter.increment(); + } + + Ok(ret) + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; use tantivy::query::AllQuery; - use crate::test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME}; + use crate::{ + collectors::index_collector::collect_from_index, + test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME}, + }; use super::*; + #[test] + fn test_string_field_index_collector() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let collector = StringFieldCollector::new(COL1_NAME, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Two docs + assert_eq!( + results.into_iter().collect::>(), + [("ABC".to_string(), 1), ("DEF".to_string(), 1)] + .into_iter() + .collect::>() + ); + } + #[test] fn test_string_field_collector() { let index = build_test_schema(); @@ -199,6 +281,25 @@ mod tests { ); } + #[test] + fn test_string_field_index_collector_json() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let col_name = format!("{}.{}", JSON_COL_NAME, "f1"); + let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Two docs + assert_eq!( + results.into_iter().collect::>(), + [("value".to_string(), 1), ("othervalue".to_string(), 1)] + .into_iter() + .collect::>() + ); + } + #[test] fn test_string_field_collector_json_invalid_field() { let index = build_test_schema(); @@ -220,6 +321,23 @@ mod tests { ); } + #[test] + fn test_string_field_collector_index_json_invalid_field() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let col_name = format!("{}.{}", JSON_COL_NAME, "invalid"); + let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // No results, no failure + assert_eq!( + results.into_iter().collect::>(), + [].into_iter().collect::>() + ); + } + #[test] fn test_string_field_collector_with_limit() { let index = build_test_schema(); @@ -236,4 +354,17 @@ mod tests { // Which doc matches first is non deterministic, just check length assert_eq!(results.len(), 1); } + + #[test] + fn test_string_field_index_collector_with_limit() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let collector = StringFieldCollector::new(COL1_NAME, 1, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Which doc matches first is non deterministic, just check length + assert_eq!(results.len(), 1); + } } diff --git a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala index 1f9726f190..1129d3055b 100644 --- a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala @@ -174,6 +174,38 @@ abstract class PartKeyIndexBenchmark { } } + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def indexNames(): Unit = { + cforRange ( 0 until 8 ) { i => + partKeyIndex.indexNames(10000) + } + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def indexValues(): Unit = { + cforRange ( 0 until 8 ) { i => + partKeyIndex.indexValues("instance", 10000) + } + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def getLabelNames(): Unit = { + cforRange ( 0 until 8 ) { i => + val filter = Seq(ColumnFilter("_ns_", Filter.Equals(s"App-$i")), + ColumnFilter("_ws_", Filter.Equals("demo"))) + partKeyIndex.labelNamesEfficient(filter, now, currentLookupTime()) + } + } + @Benchmark @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS)