Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Improve performance for Tantivy indexValues call #1867

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filodb.core.memstore

import java.io.File
import java.nio.{ByteBuffer, ByteOrder}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}

Expand All @@ -22,7 +23,7 @@ import filodb.core.memstore.PartKeyIndexRaw.{bytesRefToUnsafeOffset, ignoreIndex
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.query.{ColumnFilter, Filter}
import filodb.memory.format.UnsafeUtils
import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String}

object PartKeyTantivyIndex {
def startMemoryProfiling(): Unit = {
Expand Down Expand Up @@ -155,15 +156,46 @@ class PartKeyTantivyIndex(ref: DatasetRef,
}

override def indexNames(limit: Int): Seq[String] = {
TantivyNativeMethods.indexNames(indexHandle).filterNot {
decodeStringArray(TantivyNativeMethods.indexNames(indexHandle)).filterNot {
n => ignoreIndexNames.contains(n) || n.startsWith(FACET_FIELD_PREFIX)
}
}

override def indexValues(fieldName: String, topK: Int): Seq[TermInfo] = {
val results = TantivyNativeMethods.indexValues(indexHandle, fieldName, topK)

results.toSeq
val buffer = ByteBuffer.wrap(results)
buffer.order(ByteOrder.LITTLE_ENDIAN)

val parsedResults = new ArrayBuffer[TermInfo]()

while (buffer.hasRemaining) {
val count = buffer.getLong
val strLen = buffer.getInt
val strBytes = new Array[Byte](strLen)
buffer.get(strBytes)

parsedResults += TermInfo(ZeroCopyUTF8String.apply(strBytes), count.toInt)
}

parsedResults
}

private def decodeStringArray(arr: Array[Byte]): Seq[String] = {
val buffer = ByteBuffer.wrap(arr)
buffer.order(ByteOrder.LITTLE_ENDIAN)

val parsedResults = new ArrayBuffer[String]()

while (buffer.hasRemaining) {
val strLen = buffer.getInt
val strBytes = new Array[Byte](strLen)
buffer.get(strBytes)

parsedResults += new String(strBytes, StandardCharsets.UTF_8)
}

parsedResults
}

override def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String] = {
Expand All @@ -176,7 +208,7 @@ class PartKeyTantivyIndex(ref: DatasetRef,

labelValuesQueryLatency.record(System.nanoTime() - start)

results.toSeq
decodeStringArray(results)
}

override def labelValuesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long,
Expand All @@ -189,7 +221,7 @@ class PartKeyTantivyIndex(ref: DatasetRef,

labelValuesQueryLatency.record(System.nanoTime() - start)

results.toSeq
decodeStringArray(results)
}

override def addPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long,
Expand Down Expand Up @@ -645,19 +677,19 @@ protected object TantivyNativeMethods {

// Get the list of unique indexed field names
@native
def indexNames(handle: Long): Array[String]
def indexNames(handle: Long): Array[Byte]

// Get the list of unique values for a field
@native
def indexValues(handle: Long, fieldName: String, topK: Int): Array[TermInfo]
def indexValues(handle: Long, fieldName: String, topK: Int): Array[Byte]

// Get the list of unique indexed field names
@native
def labelNames(handle: Long, query: Array[Byte], limit: Int, start: Long, end: Long): Array[String]
def labelNames(handle: Long, query: Array[Byte], limit: Int, start: Long, end: Long): Array[Byte]

// Get the list of unique values for a field
@native
def labelValues(handle: Long, query: Array[Byte], colName: String, limit: Int, start: Long, end: Long): Array[String]
def labelValues(handle: Long, query: Array[Byte], colName: String, limit: Int, start: Long, end: Long): Array[Byte]

// Get the list of part IDs given a query
@native
Expand Down
172 changes: 99 additions & 73 deletions core/src/rust/filodb_core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use std::sync::atomic::Ordering;

use hashbrown::HashSet;
use jni::{
objects::{JByteArray, JClass, JIntArray, JObject, JString, JValue},
sys::{jbyteArray, jint, jintArray, jlong, jlongArray, jobjectArray},
objects::{JByteArray, JClass, JIntArray, JObject, JString},
sys::{jbyteArray, jint, jintArray, jlong, jlongArray},
JNIEnv,
};
use tantivy::{collector::FacetCollector, schema::FieldType};
use tantivy_utils::collectors::part_key_record_collector::PartKeyRecordCollector;
use tantivy::schema::FieldType;
use tantivy_utils::collectors::part_id_collector::PartIdCollector;
use tantivy_utils::collectors::string_field_collector::StringFieldCollector;
use tantivy_utils::collectors::time_collector::TimeCollector;
use tantivy_utils::collectors::time_range_filter::TimeRangeFilter;
use tantivy_utils::collectors::{
limited_collector::UnlimitedCollector, part_id_collector::PartIdCollector,
index_collector::collect_from_index, part_key_record_collector::PartKeyRecordCollector,
};
use tantivy_utils::collectors::{
part_key_collector::PartKeyCollector, part_key_record_collector::PartKeyRecord,
Expand All @@ -29,9 +29,6 @@ use crate::{
state::IndexHandle,
};

const TERM_INFO_CLASS: &str = "filodb/core/memstore/TermInfo";
const UTF8STR_CLASS: &str = "filodb/memory/format/ZeroCopyUTF8String";

#[no_mangle]
pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexRamBytes(
mut env: JNIEnv,
Expand Down Expand Up @@ -147,21 +144,28 @@ fn fetch_label_names(
query: FiloDBQuery,
handle: &IndexHandle,
results: &mut HashSet<String>,
limit: i32,
start: i64,
end: i64,
) -> JavaResult<()> {
// Get LABEL_NAMES facet
let mut collector = FacetCollector::for_field(facet_field_name(field_constants::LABEL_LIST));
collector.add_facet("/");
let collector = UnlimitedCollector::new(collector);

let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
let field = facet_field_name(field_constants::LABEL_LIST);
let collector = StringFieldCollector::new(
&field,
limit as usize,
usize::MAX,
handle.column_cache.clone(),
);

let query_results = if matches!(query, FiloDBQuery::All) {
collect_from_index(&handle.searcher(), collector)?
} else {
let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
handle.execute_cachable_query(query, filter_collector)?
};

let query_results = handle.execute_cachable_query(query, filter_collector)?;
let query_results: Vec<_> = query_results.get("/").collect();
for (facet, _count) in query_results {
results.insert(facet.to_path()[0].to_string());
results.insert(facet.to_string());
}

Ok(())
Expand All @@ -176,7 +180,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe
limit: jint,
start: jlong,
end: jlong,
) -> jobjectArray {
) -> jbyteArray {
jni_exec(&mut env, |env| {
let handle = IndexHandle::get_ref_from_handle(handle);

Expand All @@ -185,25 +189,41 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe
let query_bytes = env.get_byte_array(&query)?;

let query = FiloDBQuery::Complex(query_bytes.into_boxed_slice().into());
fetch_label_names(query, handle, &mut results, start, end)?;
fetch_label_names(query, handle, &mut results, limit, start, end)?;

let len = std::cmp::min(results.len(), limit as usize);
encode_string_array(env, results)
})
}

let java_ret = env.new_object_array(len as i32, "java/lang/String", JObject::null())?;
for (i, item) in results.drain().take(len).enumerate() {
env.set_object_array_element(&java_ret, i as i32, env.new_string(item)?)?;
}
fn encode_string_array(env: &mut JNIEnv, arr: HashSet<String>) -> JavaResult<jbyteArray> {
let len: usize = arr
.iter()
.map(|s| std::mem::size_of::<u32>() + s.len())
.sum();

Ok(java_ret.into_raw())
})
let mut serialzied_bytes = Vec::with_capacity(len);
for s in arr.iter() {
serialzied_bytes.extend((s.len() as i32).to_le_bytes());
serialzied_bytes.extend(s.as_bytes());
}

let java_ret = env.new_byte_array(len as i32)?;
let bytes_ptr = serialzied_bytes.as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) };

env.set_byte_array_region(&java_ret, 0, bytes_ptr)?;

Ok(java_ret.into_raw())
}

const LABEL_NAMES_AND_VALUES_DEFAULT_LIMIT: i32 = 100;

#[no_mangle]
pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_indexNames(
mut env: JNIEnv,
_class: JClass,
handle: jlong,
) -> jobjectArray {
) -> jbyteArray {
jni_exec(&mut env, |env| {
let handle = IndexHandle::get_ref_from_handle(handle);

Expand All @@ -223,15 +243,16 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde
}

let query = FiloDBQuery::All;
fetch_label_names(query, handle, &mut results, 0, i64::MAX)?;

let java_ret =
env.new_object_array(results.len() as i32, "java/lang/String", JObject::null())?;
for (i, item) in results.drain().enumerate() {
env.set_object_array_element(&java_ret, i as i32, env.new_string(item)?)?;
}
fetch_label_names(
query,
handle,
&mut results,
LABEL_NAMES_AND_VALUES_DEFAULT_LIMIT,
0,
i64::MAX,
)?;

Ok(java_ret.into_raw())
encode_string_array(env, results)
})
}

Expand Down Expand Up @@ -260,9 +281,14 @@ fn query_label_values(

let collector =
StringFieldCollector::new(&field, limit, term_limit, handle.column_cache.clone());
let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
Ok(handle.execute_cachable_query(query, filter_collector)?)

if matches!(query, FiloDBQuery::All) {
Ok(collect_from_index(&handle.searcher(), collector)?)
} else {
let filter_collector =
TimeRangeFilter::new(&collector, start, end, handle.column_cache.clone());
Ok(handle.execute_cachable_query(query, filter_collector)?)
}
} else {
// Invalid field, no values
Ok(vec![])
Expand All @@ -279,7 +305,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe
top_k: jint,
start: jlong,
end: jlong,
) -> jobjectArray {
) -> jbyteArray {
jni_exec(&mut env, |env| {
let handle = IndexHandle::get_ref_from_handle(handle);

Expand All @@ -293,14 +319,23 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_labe

let results = query_label_values(query, handle, field, top_k, usize::MAX, start, end)?;

let results_len = std::cmp::min(top_k, results.len());
let java_ret =
env.new_object_array(results_len as i32, "java/lang/String", JObject::null())?;
for (i, (value, _)) in results.into_iter().take(top_k).enumerate() {
let term_str = env.new_string(&value)?;
env.set_object_array_element(&java_ret, i as i32, &term_str)?;
let len: usize = results
.iter()
.map(|(s, _)| std::mem::size_of::<u32>() + s.len())
.sum();

let mut serialzied_bytes = Vec::with_capacity(len);
for (s, _) in results.iter() {
serialzied_bytes.extend((s.len() as i32).to_le_bytes());
serialzied_bytes.extend(s.as_bytes());
}

let java_ret = env.new_byte_array(len as i32)?;
let bytes_ptr = serialzied_bytes.as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) };

env.set_byte_array_region(&java_ret, 0, bytes_ptr)?;

Ok(java_ret.into_raw())
})
}
Expand All @@ -312,7 +347,7 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde
handle: jlong,
field: JString,
top_k: jint,
) -> jobjectArray {
) -> jbyteArray {
jni_exec(&mut env, |env| {
let handle = IndexHandle::get_ref_from_handle(handle);

Expand All @@ -331,34 +366,25 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_inde
i64::MAX,
)?;

let results_len = std::cmp::min(top_k, results.len());
let java_ret =
env.new_object_array(results_len as i32, TERM_INFO_CLASS, JObject::null())?;
for (i, (value, count)) in results.into_iter().take(top_k).enumerate() {
let len = value.as_bytes().len();
let term_bytes = env.new_byte_array(len as i32)?;
let bytes_ptr = value.as_bytes().as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, len) };

env.set_byte_array_region(&term_bytes, 0, bytes_ptr)?;

let term_str = env
.call_static_method(
UTF8STR_CLASS,
"apply",
"([B)Lfilodb/memory/format/ZeroCopyUTF8String;",
&[JValue::Object(&term_bytes)],
)?
.l()?;

let term_info_obj = env.new_object(
TERM_INFO_CLASS,
"(Lfilodb/memory/format/ZeroCopyUTF8String;I)V",
&[JValue::Object(&term_str), JValue::Int(count as i32)],
)?;
env.set_object_array_element(&java_ret, i as i32, &term_info_obj)?;
// String length, plus count, plus string data
let results_len: usize = results
.iter()
.take(top_k)
.map(|(value, _)| value.len() + std::mem::size_of::<i32>() + std::mem::size_of::<i64>())
.sum();
let mut serialzied_bytes = Vec::with_capacity(results_len);
for (value, count) in results.into_iter().take(top_k) {
serialzied_bytes.extend(count.to_le_bytes());
serialzied_bytes.extend((value.len() as i32).to_le_bytes());
serialzied_bytes.extend(value.as_bytes());
}

let java_ret = env.new_byte_array(results_len as i32)?;
let bytes_ptr = serialzied_bytes.as_ptr() as *const i8;
let bytes_ptr = unsafe { std::slice::from_raw_parts(bytes_ptr, results_len) };

env.set_byte_array_region(&java_ret, 0, bytes_ptr)?;

Ok(java_ret.into_raw())
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/rust/tantivy_utils/src/collectors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Common collectors

pub mod column_cache;
pub mod index_collector;
pub mod limited_collector;
pub mod part_id_collector;
pub mod part_key_collector;
Expand Down
Loading
Loading