From be5ee93099382fd7bf466c9156b189dbda64b25d Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Mon, 7 Oct 2024 15:30:53 -0700 Subject: [PATCH] fix(core): Improve performance for Tantivy indexValues call indexValues was falling way behind Lucene due to a few reasons: 1. We were copying results directly into Java objects, which was incurring a lot of JNI back and forth overhead 2. When querying the entire index we were looking at docs instead of the reverse index, which increased the count of items to process This PR does a few things: 1. Add perf benchmarks for the missing functions 2. Add a new IndexCollector trait that can be used to walk the index vs docs 3. Remove the JNI object usage in indexValues vs byte serialized data 4. Glue all these optimizations togther. With this Tantivy is still a bit behind Lucene for this path, but it's almost 100x faster than before. --- .../memstore/PartKeyTantivyIndex.scala | 21 ++- core/src/rust/filodb_core/src/reader.rs | 65 +++++---- core/src/rust/tantivy_utils/src/collectors.rs | 1 + .../src/collectors/index_collector.rs | 43 ++++++ .../src/collectors/string_field_collector.rs | 127 +++++++++++++++++- .../filodb.jmh/PartKeyIndexBenchmark.scala | 32 +++++ 6 files changed, 247 insertions(+), 42 deletions(-) create mode 100644 core/src/rust/tantivy_utils/src/collectors/index_collector.rs diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index fdb3c851fa..71905159cb 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -22,7 +22,7 @@ import filodb.core.memstore.PartKeyIndexRaw.{bytesRefToUnsafeOffset, ignoreIndex import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} import filodb.core.query.{ColumnFilter, Filter} -import filodb.memory.format.UnsafeUtils +import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String} object PartKeyTantivyIndex { def startMemoryProfiling(): Unit = { @@ -163,7 +163,22 @@ class PartKeyTantivyIndex(ref: DatasetRef, override def indexValues(fieldName: String, topK: Int): Seq[TermInfo] = { val results = TantivyNativeMethods.indexValues(indexHandle, fieldName, topK) - results.toSeq + + val buffer = ByteBuffer.wrap(results) + buffer.order(ByteOrder.LITTLE_ENDIAN) + + val parsedResults = new ArrayBuffer[TermInfo]() + + while (buffer.hasRemaining) { + val count = buffer.getLong + val strLen = buffer.getInt + val strBytes = new Array[Byte](strLen) + buffer.get(strBytes) + + parsedResults += TermInfo(ZeroCopyUTF8String.apply(strBytes), count.toInt) + } + + parsedResults } override def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String] = { @@ -649,7 +664,7 @@ protected object TantivyNativeMethods { // Get the list of unique values for a field @native - def indexValues(handle: Long, fieldName: String, topK: Int): Array[TermInfo] + def indexValues(handle: Long, fieldName: String, topK: Int): Array[Byte] // Get the list of unique indexed field names @native diff --git a/core/src/rust/filodb_core/src/reader.rs b/core/src/rust/filodb_core/src/reader.rs index c8fb97d9be..1decd8473e 100644 --- a/core/src/rust/filodb_core/src/reader.rs +++ b/core/src/rust/filodb_core/src/reader.rs @@ -4,15 +4,17 @@ use std::sync::atomic::Ordering; use hashbrown::HashSet; use jni::{ - objects::{JByteArray, JClass, JIntArray, JObject, JString, JValue}, + objects::{JByteArray, JClass, JIntArray, JObject, JString}, sys::{jbyteArray, jint, jintArray, jlong, jlongArray, jobjectArray}, JNIEnv, }; use tantivy::{collector::FacetCollector, schema::FieldType}; -use tantivy_utils::collectors::part_key_record_collector::PartKeyRecordCollector; use tantivy_utils::collectors::string_field_collector::StringFieldCollector; use tantivy_utils::collectors::time_collector::TimeCollector; use tantivy_utils::collectors::time_range_filter::TimeRangeFilter; +use tantivy_utils::collectors::{ + index_collector::collect_from_index, part_key_record_collector::PartKeyRecordCollector, +}; use tantivy_utils::collectors::{ limited_collector::UnlimitedCollector, part_id_collector::PartIdCollector, }; @@ -29,9 +31,6 @@ use crate::{ state::IndexHandle, }; -const TERM_INFO_CLASS: &str = "filodb/core/memstore/TermInfo"; -const UTF8STR_CLASS: &str = "filodb/memory/format/ZeroCopyUTF8String"; - #[no_mangle] pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexRamBytes( mut env: JNIEnv, @@ -260,9 +259,14 @@ fn query_label_values( let collector = StringFieldCollector::new(&field, limit, term_limit, handle.column_cache.clone()); - let filter_collector = - TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); - Ok(handle.execute_cachable_query(query, filter_collector)?) + + if matches!(query, FiloDBQuery::All) { + Ok(collect_from_index(&handle.searcher(), collector)?) + } else { + let filter_collector = + TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone()); + Ok(handle.execute_cachable_query(query, filter_collector)?) + } } else { // Invalid field, no values Ok(vec![]) @@ -312,7 +316,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde handle: jlong, field: JString, top_k: jint, -) -> jobjectArray { +) -> jbyteArray { jni_exec(&mut env, |env| { let handle = IndexHandle::get_ref_from_handle(handle); @@ -331,34 +335,25 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde i64::MAX, )?; - let results_len = std::cmp::min(top_k, results.len()); - let java_ret = - env.new_object_array(results_len as i32, TERM_INFO_CLASS, JObject::null())?; - for (i, (value, count)) in results.into_iter().take(top_k).enumerate() { - let len = value.as_bytes().len(); - let term_bytes = env.new_byte_array(len as i32)?; - let bytes_ptr = value.as_bytes().as_ptr() as *const i8; - let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) }; - - env.set_byte_array_region(&term_bytes, 0, bytes_ptr)?; - - let term_str = env - .call_static_method( - UTF8STR_CLASS, - "apply", - "([B)Lfilodb/memory/format/ZeroCopyUTF8String;", - &[JValue::Object(&term_bytes)], - )? - .l()?; - - let term_info_obj = env.new_object( - TERM_INFO_CLASS, - "(Lfilodb/memory/format/ZeroCopyUTF8String;I)V", - &[JValue::Object(&term_str), JValue::Int(count as i32)], - )?; - env.set_object_array_element(&java_ret, i as i32, &term_info_obj)?; + // String length, plus count, plus string data + let results_len: usize = results + .iter() + .take(top_k) + .map(|(value, _)| value.len() + std::mem::size_of::() + std::mem::size_of::()) + .sum(); + let mut serialzied_bytes = Vec::with_capacity(results_len); + for (value, count) in results.into_iter().take(top_k) { + serialzied_bytes.extend(count.to_le_bytes()); + serialzied_bytes.extend((value.len() as i32).to_le_bytes()); + serialzied_bytes.extend(value.as_bytes()); } + let java_ret = env.new_byte_array(results_len as i32)?; + let bytes_ptr = serialzied_bytes.as_ptr() as *const i8; + let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, results_len) }; + + env.set_byte_array_region(&java_ret, 0, bytes_ptr)?; + Ok(java_ret.into_raw()) }) } diff --git a/core/src/rust/tantivy_utils/src/collectors.rs b/core/src/rust/tantivy_utils/src/collectors.rs index ae48b64862..2fa1ab07a3 100644 --- a/core/src/rust/tantivy_utils/src/collectors.rs +++ b/core/src/rust/tantivy_utils/src/collectors.rs @@ -1,6 +1,7 @@ //! Common collectors pub mod column_cache; +pub mod index_collector; pub mod limited_collector; pub mod part_id_collector; pub mod part_key_collector; diff --git a/core/src/rust/tantivy_utils/src/collectors/index_collector.rs b/core/src/rust/tantivy_utils/src/collectors/index_collector.rs new file mode 100644 index 0000000000..33ae4a73ff --- /dev/null +++ b/core/src/rust/tantivy_utils/src/collectors/index_collector.rs @@ -0,0 +1,43 @@ +//! Collector that can run over an entire index without a query +//! + +use tantivy::{collector::SegmentCollector, Searcher, SegmentReader, TantivyError}; + +use super::limited_collector::{LimitCounter, LimitedCollector, LimitedSegmentCollector}; + +/// Index Segment collector +pub trait IndexCollector: LimitedCollector +where + Self::Child: LimitedSegmentCollector, +{ + /// Colllect data across an entire index segment + fn collect_over_index( + &self, + reader: &SegmentReader, + limiter: &mut LimitCounter, + ) -> Result<::Fruit, TantivyError>; +} + +pub fn collect_from_index(searcher: &Searcher, collector: C) -> Result +where + C: IndexCollector, + C::Child: LimitedSegmentCollector, +{ + let segment_readers = searcher.segment_readers(); + let mut fruits: Vec<::Fruit> = + Vec::with_capacity(segment_readers.len()); + + let mut limiter = LimitCounter::new(collector.limit()); + + for segment_reader in segment_readers.iter() { + let results = collector.collect_over_index(segment_reader, &mut limiter)?; + + fruits.push(results); + + if limiter.at_limit() { + break; + } + } + + collector.merge_fruits(fruits) +} diff --git a/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs b/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs index 0b050c50f2..2fea8feae4 100644 --- a/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs +++ b/core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs @@ -1,5 +1,6 @@ //! Collector to string values from a document +use core::str; use std::collections::hash_map::Entry; use hashbrown::HashMap; @@ -7,12 +8,16 @@ use nohash_hasher::IntMap; use tantivy::{ collector::{Collector, SegmentCollector}, columnar::StrColumn, + TantivyError, }; use crate::collectors::column_cache::ColumnCache; -use super::limited_collector::{ - LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector, +use super::{ + index_collector::IndexCollector, + limited_collector::{ + LimitCounter, LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector, + }, }; pub struct StringFieldCollector<'a> { @@ -66,7 +71,7 @@ impl<'a> Collector for StringFieldCollector<'a> { &self, segment_fruits: Vec>, ) -> tantivy::Result> { - let mut results = HashMap::new(); + let mut results = HashMap::with_capacity(self.limit); for mut map in segment_fruits.into_iter() { for (value, count) in map.drain() { @@ -144,16 +149,81 @@ impl SegmentCollector for StringFieldSegmentCollector { } } +impl<'a> IndexCollector for StringFieldCollector<'a> { + fn collect_over_index( + &self, + reader: &tantivy::SegmentReader, + limiter: &mut LimitCounter, + ) -> Result<::Fruit, tantivy::TantivyError> { + let Some((field, prefix)) = reader.schema().find_field(self.field) else { + return Err(TantivyError::FieldNotFound(self.field.to_string())); + }; + + let mut ret = HashMap::with_capacity(self.limit); + + if limiter.at_limit() { + return Ok(ret); + } + + let index_reader = reader.inverted_index(field)?; + let mut index_reader = index_reader.terms().range(); + if !prefix.is_empty() { + // Only look at prefix range + index_reader = index_reader.ge(format!("{}\0", prefix)); + index_reader = index_reader.lt(format!("{}\u{001}", prefix)); + } + let mut index_reader = index_reader.into_stream()?; + while !limiter.at_limit() && index_reader.advance() { + let mut key_bytes = index_reader.key(); + if !prefix.is_empty() { + // Skip prefix + key_bytes = &key_bytes[prefix.len() + 2..]; + } + let key = str::from_utf8(key_bytes) + .map_err(|e| TantivyError::InternalError(e.to_string()))?; + + // capture it + ret.insert(key.to_string(), index_reader.value().doc_freq as u64); + + // No need to check error, the check at the top of the while will handle it + let _ = limiter.increment(); + } + + Ok(ret) + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; use tantivy::query::AllQuery; - use crate::test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME}; + use crate::{ + collectors::index_collector::collect_from_index, + test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME}, + }; use super::*; + #[test] + fn test_string_field_index_collector() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let collector = StringFieldCollector::new(COL1_NAME, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Two docs + assert_eq!( + results.into_iter().collect::>(), + [("ABC".to_string(), 1), ("DEF".to_string(), 1)] + .into_iter() + .collect::>() + ); + } + #[test] fn test_string_field_collector() { let index = build_test_schema(); @@ -199,6 +269,25 @@ mod tests { ); } + #[test] + fn test_string_field_index_collector_json() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let col_name = format!("{}.{}", JSON_COL_NAME, "f1"); + let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Two docs + assert_eq!( + results.into_iter().collect::>(), + [("value".to_string(), 1), ("othervalue".to_string(), 1)] + .into_iter() + .collect::>() + ); + } + #[test] fn test_string_field_collector_json_invalid_field() { let index = build_test_schema(); @@ -220,6 +309,23 @@ mod tests { ); } + #[test] + fn test_string_field_collector_index_json_invalid_field() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let col_name = format!("{}.{}", JSON_COL_NAME, "invalid"); + let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // No results, no failure + assert_eq!( + results.into_iter().collect::>(), + [].into_iter().collect::>() + ); + } + #[test] fn test_string_field_collector_with_limit() { let index = build_test_schema(); @@ -236,4 +342,17 @@ mod tests { // Which doc matches first is non deterministic, just check length assert_eq!(results.len(), 1); } + + #[test] + fn test_string_field_index_collector_with_limit() { + let index = build_test_schema(); + let column_cache = ColumnCache::default(); + + let collector = StringFieldCollector::new(COL1_NAME, 1, usize::MAX, column_cache); + + let results = collect_from_index(&index.searcher, collector).expect("Should succeed"); + + // Which doc matches first is non deterministic, just check length + assert_eq!(results.len(), 1); + } } diff --git a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala index 1f9726f190..1129d3055b 100644 --- a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala @@ -174,6 +174,38 @@ abstract class PartKeyIndexBenchmark { } } + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def indexNames(): Unit = { + cforRange ( 0 until 8 ) { i => + partKeyIndex.indexNames(10000) + } + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def indexValues(): Unit = { + cforRange ( 0 until 8 ) { i => + partKeyIndex.indexValues("instance", 10000) + } + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def getLabelNames(): Unit = { + cforRange ( 0 until 8 ) { i => + val filter = Seq(ColumnFilter("_ns_", Filter.Equals(s"App-$i")), + ColumnFilter("_ws_", Filter.Equals("demo"))) + partKeyIndex.labelNamesEfficient(filter, now, currentLookupTime()) + } + } + @Benchmark @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS)