Skip to content

Commit

Permalink
fix(core): Improve performance for Tantivy indexValues call
Browse files Browse the repository at this point in the history
indexValues was falling way behind Lucene due to a few reasons:

1. We were copying results directly into Java objects, which was incurring a lot of JNI back and forth overhead
2. When querying the entire index we were looking at docs instead of the reverse index, which increased
   the count of items to process

This PR does a few things:

1. Add perf benchmarks for the missing functions
2. Add a new IndexCollector trait that can be used to walk the index vs docs
3. Remove the JNI object usage in indexValues vs byte serialized data
4. Glue all these optimizations togther.

With this Tantivy is still a bit behind Lucene for this path, but it's almost 100x faster
than before.
  • Loading branch information
rfairfax committed Oct 7, 2024
1 parent ebee7ae commit be5ee93
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 42 deletions.
21 changes: 18 additions & 3 deletions core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import filodb.core.memstore.PartKeyIndexRaw.{bytesRefToUnsafeOffset, ignoreIndex
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.query.{ColumnFilter, Filter}
import filodb.memory.format.UnsafeUtils
import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String}

object PartKeyTantivyIndex {
def startMemoryProfiling(): Unit = {
Expand Down Expand Up @@ -163,7 +163,22 @@ class PartKeyTantivyIndex(ref: DatasetRef,
override def indexValues(fieldName: String, topK: Int): Seq[TermInfo] = {
val results = TantivyNativeMethods.indexValues(indexHandle, fieldName, topK)

results.toSeq

val buffer = ByteBuffer.wrap(results)
buffer.order(ByteOrder.LITTLE_ENDIAN)

val parsedResults = new ArrayBuffer[TermInfo]()

while (buffer.hasRemaining) {
val count = buffer.getLong
val strLen = buffer.getInt
val strBytes = new Array[Byte](strLen)
buffer.get(strBytes)

parsedResults += TermInfo(ZeroCopyUTF8String.apply(strBytes), count.toInt)
}

parsedResults
}

override def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String] = {
Expand Down Expand Up @@ -649,7 +664,7 @@ protected object TantivyNativeMethods {

// Get the list of unique values for a field
@native
def indexValues(handle: Long, fieldName: String, topK: Int): Array[TermInfo]
def indexValues(handle: Long, fieldName: String, topK: Int): Array[Byte]

// Get the list of unique indexed field names
@native
Expand Down
65 changes: 30 additions & 35 deletions core/src/rust/filodb_core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use std::sync::atomic::Ordering;

use hashbrown::HashSet;
use jni::{
objects::{JByteArray, JClass, JIntArray, JObject, JString, JValue},
objects::{JByteArray, JClass, JIntArray, JObject, JString},
sys::{jbyteArray, jint, jintArray, jlong, jlongArray, jobjectArray},
JNIEnv,
};
use tantivy::{collector::FacetCollector, schema::FieldType};
use tantivy_utils::collectors::part_key_record_collector::PartKeyRecordCollector;
use tantivy_utils::collectors::string_field_collector::StringFieldCollector;
use tantivy_utils::collectors::time_collector::TimeCollector;
use tantivy_utils::collectors::time_range_filter::TimeRangeFilter;
use tantivy_utils::collectors::{
index_collector::collect_from_index, part_key_record_collector::PartKeyRecordCollector,
};
use tantivy_utils::collectors::{
limited_collector::UnlimitedCollector, part_id_collector::PartIdCollector,
};
Expand All @@ -29,9 +31,6 @@ use crate::{
state::IndexHandle,
};

const TERM_INFO_CLASS: &str = "filodb/core/memstore/TermInfo";
const UTF8STR_CLASS: &str = "filodb/memory/format/ZeroCopyUTF8String";

#[no_mangle]
pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexRamBytes(
mut env: JNIEnv,
Expand Down Expand Up @@ -260,9 +259,14 @@ fn query_label_values(

let collector =
StringFieldCollector::new(&field, limit, term_limit, handle.column_cache.clone());
let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
Ok(handle.execute_cachable_query(query, filter_collector)?)

if matches!(query, FiloDBQuery::All) {
Ok(collect_from_index(&handle.searcher(), collector)?)
} else {
let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
Ok(handle.execute_cachable_query(query, filter_collector)?)
}
} else {
// Invalid field, no values
Ok(vec![])
Expand Down Expand Up @@ -312,7 +316,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde
handle: jlong,
field: JString,
top_k: jint,
) -> jobjectArray {
) -> jbyteArray {
jni_exec(&mut env, |env| {
let handle = IndexHandle::get_ref_from_handle(handle);

Expand All @@ -331,34 +335,25 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde
i64::MAX,
)?;

let results_len = std::cmp::min(top_k, results.len());
let java_ret =
env.new_object_array(results_len as i32, TERM_INFO_CLASS, JObject::null())?;
for (i, (value, count)) in results.into_iter().take(top_k).enumerate() {
let len = value.as_bytes().len();
let term_bytes = env.new_byte_array(len as i32)?;
let bytes_ptr = value.as_bytes().as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) };

env.set_byte_array_region(&term_bytes, 0, bytes_ptr)?;

let term_str = env
.call_static_method(
UTF8STR_CLASS,
"apply",
"([B)Lfilodb/memory/format/ZeroCopyUTF8String;",
&[JValue::Object(&term_bytes)],
)?
.l()?;

let term_info_obj = env.new_object(
TERM_INFO_CLASS,
"(Lfilodb/memory/format/ZeroCopyUTF8String;I)V",
&[JValue::Object(&term_str), JValue::Int(count as i32)],
)?;
env.set_object_array_element(&java_ret, i as i32, &term_info_obj)?;
// String length, plus count, plus string data
let results_len: usize = results
.iter()
.take(top_k)
.map(|(value, _)| value.len() + std::mem::size_of::<i32>() + std::mem::size_of::<i64>())
.sum();
let mut serialzied_bytes = Vec::with_capacity(results_len);
for (value, count) in results.into_iter().take(top_k) {
serialzied_bytes.extend(count.to_le_bytes());
serialzied_bytes.extend((value.len() as i32).to_le_bytes());
serialzied_bytes.extend(value.as_bytes());
}

let java_ret = env.new_byte_array(results_len as i32)?;
let bytes_ptr = serialzied_bytes.as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, results_len) };

env.set_byte_array_region(&java_ret, 0, bytes_ptr)?;

Ok(java_ret.into_raw())
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/rust/tantivy_utils/src/collectors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Common collectors
pub mod column_cache;
pub mod index_collector;
pub mod limited_collector;
pub mod part_id_collector;
pub mod part_key_collector;
Expand Down
43 changes: 43 additions & 0 deletions core/src/rust/tantivy_utils/src/collectors/index_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Collector that can run over an entire index without a query
//!
use tantivy::{collector::SegmentCollector, Searcher, SegmentReader, TantivyError};

use super::limited_collector::{LimitCounter, LimitedCollector, LimitedSegmentCollector};

/// Index Segment collector
pub trait IndexCollector: LimitedCollector
where
Self::Child: LimitedSegmentCollector,
{
/// Colllect data across an entire index segment
fn collect_over_index(
&self,
reader: &SegmentReader,
limiter: &mut LimitCounter,
) -> Result<<Self::Child as SegmentCollector>::Fruit, TantivyError>;
}

pub fn collect_from_index<C>(searcher: &Searcher, collector: C) -> Result<C::Fruit, TantivyError>
where
C: IndexCollector,
C::Child: LimitedSegmentCollector,
{
let segment_readers = searcher.segment_readers();
let mut fruits: Vec<<C::Child as SegmentCollector>::Fruit> =
Vec::with_capacity(segment_readers.len());

let mut limiter = LimitCounter::new(collector.limit());

for segment_reader in segment_readers.iter() {
let results = collector.collect_over_index(segment_reader, &mut limiter)?;

fruits.push(results);

if limiter.at_limit() {
break;
}
}

collector.merge_fruits(fruits)
}
127 changes: 123 additions & 4 deletions core/src/rust/tantivy_utils/src/collectors/string_field_collector.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
//! Collector to string values from a document
use core::str;
use std::collections::hash_map::Entry;

use hashbrown::HashMap;
use nohash_hasher::IntMap;
use tantivy::{
collector::{Collector, SegmentCollector},
columnar::StrColumn,
TantivyError,
};

use crate::collectors::column_cache::ColumnCache;

use super::limited_collector::{
LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector,
use super::{
index_collector::IndexCollector,
limited_collector::{
LimitCounter, LimitCounterOptionExt, LimitResult, LimitedCollector, LimitedSegmentCollector,
},
};

pub struct StringFieldCollector<'a> {
Expand Down Expand Up @@ -66,7 +71,7 @@ impl<'a> Collector for StringFieldCollector<'a> {
&self,
segment_fruits: Vec<HashMap<String, u64>>,
) -> tantivy::Result<Vec<(String, u64)>> {
let mut results = HashMap::new();
let mut results = HashMap::with_capacity(self.limit);

for mut map in segment_fruits.into_iter() {
for (value, count) in map.drain() {
Expand Down Expand Up @@ -144,16 +149,81 @@ impl SegmentCollector for StringFieldSegmentCollector {
}
}

impl<'a> IndexCollector for StringFieldCollector<'a> {
fn collect_over_index(
&self,
reader: &tantivy::SegmentReader,
limiter: &mut LimitCounter,
) -> Result<<Self::Child as SegmentCollector>::Fruit, tantivy::TantivyError> {
let Some((field, prefix)) = reader.schema().find_field(self.field) else {
return Err(TantivyError::FieldNotFound(self.field.to_string()));
};

let mut ret = HashMap::with_capacity(self.limit);

if limiter.at_limit() {
return Ok(ret);
}

let index_reader = reader.inverted_index(field)?;
let mut index_reader = index_reader.terms().range();
if !prefix.is_empty() {
// Only look at prefix range
index_reader = index_reader.ge(format!("{}\0", prefix));
index_reader = index_reader.lt(format!("{}\u{001}", prefix));
}
let mut index_reader = index_reader.into_stream()?;
while !limiter.at_limit() && index_reader.advance() {
let mut key_bytes = index_reader.key();
if !prefix.is_empty() {
// Skip prefix
key_bytes = &key_bytes[prefix.len() + 2..];
}
let key = str::from_utf8(key_bytes)
.map_err(|e| TantivyError::InternalError(e.to_string()))?;

// capture it
ret.insert(key.to_string(), index_reader.value().doc_freq as u64);

// No need to check error, the check at the top of the while will handle it
let _ = limiter.increment();
}

Ok(ret)
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use tantivy::query::AllQuery;

use crate::test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME};
use crate::{
collectors::index_collector::collect_from_index,
test_utils::{build_test_schema, COL1_NAME, JSON_COL_NAME},
};

use super::*;

#[test]
fn test_string_field_index_collector() {
let index = build_test_schema();
let column_cache = ColumnCache::default();

let collector = StringFieldCollector::new(COL1_NAME, usize::MAX, usize::MAX, column_cache);

let results = collect_from_index(&index.searcher, collector).expect("Should succeed");

// Two docs
assert_eq!(
results.into_iter().collect::<HashSet<_>>(),
[("ABC".to_string(), 1), ("DEF".to_string(), 1)]
.into_iter()
.collect::<HashSet<_>>()
);
}

#[test]
fn test_string_field_collector() {
let index = build_test_schema();
Expand Down Expand Up @@ -199,6 +269,25 @@ mod tests {
);
}

#[test]
fn test_string_field_index_collector_json() {
let index = build_test_schema();
let column_cache = ColumnCache::default();

let col_name = format!("{}.{}", JSON_COL_NAME, "f1");
let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache);

let results = collect_from_index(&index.searcher, collector).expect("Should succeed");

// Two docs
assert_eq!(
results.into_iter().collect::<HashSet<_>>(),
[("value".to_string(), 1), ("othervalue".to_string(), 1)]
.into_iter()
.collect::<HashSet<_>>()
);
}

#[test]
fn test_string_field_collector_json_invalid_field() {
let index = build_test_schema();
Expand All @@ -220,6 +309,23 @@ mod tests {
);
}

#[test]
fn test_string_field_collector_index_json_invalid_field() {
let index = build_test_schema();
let column_cache = ColumnCache::default();

let col_name = format!("{}.{}", JSON_COL_NAME, "invalid");
let collector = StringFieldCollector::new(&col_name, usize::MAX, usize::MAX, column_cache);

let results = collect_from_index(&index.searcher, collector).expect("Should succeed");

// No results, no failure
assert_eq!(
results.into_iter().collect::<HashSet<_>>(),
[].into_iter().collect::<HashSet<_>>()
);
}

#[test]
fn test_string_field_collector_with_limit() {
let index = build_test_schema();
Expand All @@ -236,4 +342,17 @@ mod tests {
// Which doc matches first is non deterministic, just check length
assert_eq!(results.len(), 1);
}

#[test]
fn test_string_field_index_collector_with_limit() {
let index = build_test_schema();
let column_cache = ColumnCache::default();

let collector = StringFieldCollector::new(COL1_NAME, 1, usize::MAX, column_cache);

let results = collect_from_index(&index.searcher, collector).expect("Should succeed");

// Which doc matches first is non deterministic, just check length
assert_eq!(results.len(), 1);
}
}
Loading

0 comments on commit be5ee93

Please sign in to comment.