diff --git a/influxdb3_cache/src/last_cache/cache.rs b/influxdb3_cache/src/last_cache/cache.rs index e92c9ce6a5e..8b133eca67e 100644 --- a/influxdb3_cache/src/last_cache/cache.rs +++ b/influxdb3_cache/src/last_cache/cache.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{BTreeSet, HashMap, HashSet, VecDeque}, ops::Deref, sync::Arc, time::{Duration, Instant}, @@ -17,11 +17,6 @@ use arrow::{ }, error::ArrowError, }; -use datafusion::{ - logical_expr::{expr::InList, BinaryExpr, Operator}, - prelude::Expr, - scalar::ScalarValue, -}; use indexmap::{IndexMap, IndexSet}; use influxdb3_catalog::catalog::{ColumnDefinition, TableDefinition, TIME_COLUMN_NAME}; use influxdb3_id::{ColumnId, TableId}; @@ -56,8 +51,6 @@ pub(crate) struct LastCache { /// map preserves the order of the elements, thereby maintaining the order of the keys in /// the cache. pub(crate) key_column_ids: Arc>, - /// The key columns for this cache, by their names - pub(crate) key_column_name_to_ids: Arc, ColumnId>>, /// The value columns for this cache pub(crate) value_columns: ValueColumnType, /// The Arrow Schema for the table that this cache is associated with @@ -86,6 +79,7 @@ pub struct CreateLastCacheArgs { /// The default cache time-to-live (TTL) is 4 hours pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 4); +/// The time to live (TTL) for entries in the cache #[derive(Debug, Clone, Copy, Deserialize)] pub struct LastCacheTtl(Duration); @@ -115,6 +109,7 @@ impl Default for LastCacheTtl { } } +/// Specifies the key column configuration for a new [`LastCache`] #[derive(Debug, Default, Clone)] pub enum LastCacheKeyColumnsArg { /// Use the series key columns in their order as the last cache key columns @@ -133,6 +128,7 @@ impl From>> for LastCacheKeyColumnsArg { } } +/// Specifies the value column configuration for a new [`LastCache`] #[derive(Debug, Default, Clone)] pub enum LastCacheValueColumnsArg { /// Use all non-key columns as value columns when initialized, and add new field columns that @@ -156,6 +152,10 @@ impl From>> for LastCacheValueColumnsArg { impl LastCache { /// Create a new [`LastCache`] + /// + /// The uses the provided `TableDefinition` to build an arrow schema for the cache. This will + /// validate the given arguments and can error if there are invalid columns specified, or if a + /// non-compatible column is used as a key to the cache. pub(crate) fn new( CreateLastCacheArgs { table_def, @@ -260,7 +260,6 @@ impl LastCache { count, ttl: ttl.into(), key_column_ids: Arc::new(key_column_ids), - key_column_name_to_ids: Arc::new(key_column_name_to_ids), value_columns: match value_columns { LastCacheValueColumnsArg::AcceptNew => ValueColumnType::AcceptNew { seen }, LastCacheValueColumnsArg::Explicit(_) => ValueColumnType::Explicit { @@ -340,32 +339,36 @@ impl LastCache { /// This will panic if the internal cache state's keys are out-of-order with respect to the /// order of the `key_columns` on this [`LastCache`] pub(crate) fn push(&mut self, row: &Row, table_def: Arc) { - let accept_new_fields = self.accept_new_fields(); - let mut target = &mut self.state; - let mut key_iter = self.key_column_ids.iter().peekable(); - while let (Some(col_id), peek) = (key_iter.next(), key_iter.peek()) { - if target.is_init() { - *target = LastCacheState::Key(LastCacheKey { - column_id: *col_id, - value_map: Default::default(), - }); - } + let mut values = Vec::with_capacity(self.key_column_ids.len()); + for id in self.key_column_ids.iter() { let Some(value) = row .fields .iter() - .find(|f| f.id == *col_id) + .find(|f| &f.id == id) .map(|f| KeyValue::from(&f.value)) else { // ignore the row if it does not contain all key columns return; }; + values.push(value); + } + let accept_new_fields = self.accept_new_fields(); + let mut target = &mut self.state; + let mut iter = self.key_column_ids.iter().zip(values).peekable(); + while let (Some((col_id, value)), peek) = (iter.next(), iter.peek()) { + if target.is_init() { + *target = LastCacheState::Key(LastCacheKey { + column_id: *col_id, + value_map: Default::default(), + }); + } let cache_key = target.as_key_mut().unwrap(); assert_eq!( &cache_key.column_id, col_id, "key columns must match cache key order" ); target = cache_key.value_map.entry(value).or_insert_with(|| { - if let Some(next_col_id) = peek { + if let Some((next_col_id, _)) = peek { LastCacheState::Key(LastCacheKey { column_id: **next_col_id, value_map: Default::default(), @@ -411,14 +414,14 @@ impl LastCache { pub(crate) fn to_record_batches( &self, table_def: Arc, - predicates: &[Predicate], + predicates: &IndexMap, ) -> Result, ArrowError> { // map the provided predicates on to the key columns // there may not be predicates provided for each key column, hence the Option let predicates: Vec> = self .key_column_ids .iter() - .map(|col_id| predicates.iter().find(|p| p.column_id == *col_id)) + .map(|id| predicates.get(id)) .collect(); let mut caches = vec![ExtendedLastCacheState { @@ -465,78 +468,6 @@ impl LastCache { .collect() } - /// Convert a set of DataFusion filter [`Expr`]s into [`Predicate`]s - /// - /// This only handles binary expressions, e.g., `foo = 'bar'`, and will use the `key_columns` - /// to filter out expressions that do not match key columns in the cache. - pub(crate) fn convert_filter_exprs(&self, exprs: &[Expr]) -> Vec { - exprs - .iter() - .filter_map(|expr| { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - let col_id = if let Expr::Column(c) = left.as_ref() { - self.key_column_name_to_ids.get(c.name()).copied()? - } else { - return None; - }; - let value = match right.as_ref() { - Expr::Literal(ScalarValue::Utf8(Some(v))) => { - KeyValue::String(v.to_owned()) - } - Expr::Literal(ScalarValue::Boolean(Some(v))) => KeyValue::Bool(*v), - // TODO: handle integer types that can be casted up to i64/u64: - Expr::Literal(ScalarValue::Int64(Some(v))) => KeyValue::Int(*v), - Expr::Literal(ScalarValue::UInt64(Some(v))) => KeyValue::UInt(*v), - _ => return None, - }; - match op { - Operator::Eq => Some(Predicate::new_eq(col_id, value)), - Operator::NotEq => Some(Predicate::new_not_eq(col_id, value)), - _ => None, - } - } - Expr::InList(InList { - expr, - list, - negated, - }) => { - let col_id = if let Expr::Column(c) = expr.as_ref() { - self.key_column_name_to_ids.get(c.name()).copied()? - } else { - return None; - }; - let values: Vec = list - .iter() - .filter_map(|e| match e { - Expr::Literal(ScalarValue::Utf8(Some(v))) => { - Some(KeyValue::String(v.to_owned())) - } - Expr::Literal(ScalarValue::Boolean(Some(v))) => { - Some(KeyValue::Bool(*v)) - } - // TODO: handle integer types that can be casted up to i64/u64: - Expr::Literal(ScalarValue::Int64(Some(v))) => { - Some(KeyValue::Int(*v)) - } - Expr::Literal(ScalarValue::UInt64(Some(v))) => { - Some(KeyValue::UInt(*v)) - } - _ => None, - }) - .collect(); - if *negated { - Some(Predicate::new_not_in(col_id, values)) - } else { - Some(Predicate::new_in(col_id, values)) - } - } - _ => None, - } - }) - .collect() - } - /// Remove expired values from the internal cache state pub(crate) fn remove_expired(&mut self) { self.state.remove_expired(); @@ -663,50 +594,52 @@ impl ExtendedLastCacheState<'_> { } /// A predicate used for evaluating key column values in the cache on query +/// +/// Can either be an inclusive set or exclusive set. `BTreeSet` is used to +/// have the predicate values odered and displayed in a sorted order in +/// query `EXPLAIN` plans. #[derive(Debug, Clone)] -pub struct Predicate { - /// The left-hand-side of the predicate as a valid `ColumnId` - column_id: ColumnId, - /// The right-hand-side of the predicate - kind: PredicateKind, +pub(crate) enum Predicate { + In(BTreeSet), + NotIn(BTreeSet), } -impl Predicate { - pub(crate) fn new_eq(column_id: ColumnId, value: KeyValue) -> Self { - Self { - column_id, - kind: PredicateKind::Eq(value), +impl std::fmt::Display for Predicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Predicate::In(_) => write!(f, "IN (")?, + Predicate::NotIn(_) => write!(f, "NOT IN (")?, } - } - - pub(crate) fn new_not_eq(column_id: ColumnId, value: KeyValue) -> Self { - Self { - column_id, - kind: PredicateKind::NotEq(value), + let mut values = self.values(); + while let Some(v) = values.next() { + write!(f, "{v}")?; + if values.size_hint().0 > 0 { + write!(f, ",")?; + } } - } - pub(crate) fn new_in(column_id: ColumnId, values: Vec) -> Self { - Self { - column_id, - kind: PredicateKind::In(values), - } + write!(f, ")") } +} - pub(crate) fn new_not_in(column_id: ColumnId, values: Vec) -> Self { - Self { - column_id, - kind: PredicateKind::NotIn(values), +impl Predicate { + fn values(&self) -> impl Iterator { + match self { + Predicate::In(btree_set) => btree_set.iter(), + Predicate::NotIn(btree_set) => btree_set.iter(), } } } -#[derive(Debug, Clone)] -pub(crate) enum PredicateKind { - Eq(KeyValue), - NotEq(KeyValue), - In(Vec), - NotIn(Vec), +#[cfg(test)] +impl Predicate { + pub(crate) fn new_in(values: impl IntoIterator) -> Self { + Self::In(values.into_iter().collect()) + } + + pub(crate) fn new_not_in(values: impl IntoIterator) -> Self { + Self::NotIn(values.into_iter().collect()) + } } /// Represents the hierarchical last cache structure @@ -777,37 +710,16 @@ struct LastCacheKey { impl LastCacheKey { /// Evaluate the provided [`Predicate`] by using its value to lookup in this [`LastCacheKey`]'s /// value map. - /// - /// # Panics - /// - /// This assumes that a predicate for this [`LastCacheKey`]'s column was provided, and will panic - /// otherwise. fn evaluate_predicate<'a: 'b, 'b>( &'a self, predicate: &'b Predicate, ) -> Vec<(&'a LastCacheState, &'b KeyValue)> { - if predicate.column_id != self.column_id { - panic!( - "attempted to evaluate unexpected predicate with key {} for column with id {}", - predicate.column_id, self.column_id - ); - } - match &predicate.kind { - PredicateKind::Eq(val) => self - .value_map - .get(val) - .map(|s| vec![(s, val)]) - .unwrap_or_default(), - PredicateKind::NotEq(val) => self - .value_map - .iter() - .filter_map(|(v, s)| (v != val).then_some((s, v))) - .collect(), - PredicateKind::In(vals) => vals + match predicate { + Predicate::In(vals) => vals .iter() .filter_map(|v| self.value_map.get(v).map(|s| (s, v))) .collect(), - PredicateKind::NotIn(vals) => self + Predicate::NotIn(vals) => self .value_map .iter() .filter_map(|(v, s)| (!vals.contains(v)).then_some((s, v))) @@ -828,7 +740,7 @@ impl LastCacheKey { } /// A value for a key column in a [`LastCache`] -#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)] pub(crate) enum KeyValue { String(String), Int(i64), @@ -836,6 +748,17 @@ pub(crate) enum KeyValue { Bool(bool), } +impl std::fmt::Display for KeyValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + KeyValue::String(s) => write!(f, "'{s}'"), + KeyValue::Int(i) => write!(f, "{i}i"), + KeyValue::UInt(u) => write!(f, "{u}u"), + KeyValue::Bool(b) => write!(f, "{b}"), + } + } +} + #[cfg(test)] impl KeyValue { pub(crate) fn string(s: impl Into) -> Self { diff --git a/influxdb3_cache/src/last_cache/mod.rs b/influxdb3_cache/src/last_cache/mod.rs index 01fce2c8f0a..ddeb3ceac0a 100644 --- a/influxdb3_cache/src/last_cache/mod.rs +++ b/influxdb3_cache/src/last_cache/mod.rs @@ -8,7 +8,7 @@ mod provider; pub use provider::LastCacheProvider; mod table_function; use schema::InfluxColumnType; -pub use table_function::LastCacheFunction; +pub use table_function::{LastCacheFunction, LAST_CACHE_UDTF_NAME}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -44,8 +44,11 @@ impl Error { mod tests { use std::{cmp::Ordering, sync::Arc, thread, time::Duration}; + use arrow::array::AsArray; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use bimap::BiHashMap; + use datafusion::prelude::SessionContext; + use indexmap::IndexMap; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{LastCacheDefinition, LastCacheSize}; @@ -56,13 +59,19 @@ mod tests { KeyValue, LastCache, LastCacheKeyColumnsArg, LastCacheValueColumnsArg, Predicate, DEFAULT_CACHE_TTL, }, - CreateLastCacheArgs, LastCacheProvider, + CreateLastCacheArgs, LastCacheFunction, LastCacheProvider, LAST_CACHE_UDTF_NAME, }, test_helpers::{column_ids_for_names, TestWriter}, }; use super::LastCacheTtl; + fn predicates( + preds: impl IntoIterator, + ) -> IndexMap { + preds.into_iter().collect() + } + #[test] fn pick_up_latest_write() { let writer = TestWriter::new(); @@ -88,11 +97,11 @@ mod tests { cache.push(row, Arc::clone(&table_def)); } - let predicates = &[Predicate::new_eq(col_id, KeyValue::string("a"))]; + let predicates = predicates([(col_id, Predicate::new_in([KeyValue::string("a")]))]); // Check what is in the last cache: let batch = cache - .to_record_batches(Arc::clone(&table_def), predicates) + .to_record_batches(Arc::clone(&table_def), &predicates) .unwrap(); assert_batches_eq!( @@ -112,7 +121,7 @@ mod tests { cache.push(row, Arc::clone(&table_def)); } - let batch = cache.to_record_batches(table_def, predicates).unwrap(); + let batch = cache.to_record_batches(table_def, &predicates).unwrap(); assert_batches_eq!( [ @@ -177,17 +186,17 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ // Predicate including both key columns only produces value columns from the cache TestCase { - predicates: &[ - Predicate::new_eq(region_col_id, KeyValue::string("us")), - Predicate::new_eq(host_col_id, KeyValue::string("c")), - ], + predicates: predicates([ + (region_col_id, Predicate::new_in([KeyValue::string("us")])), + (host_col_id, Predicate::new_in([KeyValue::string("c")])), + ]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -199,7 +208,10 @@ mod tests { // Predicate on only region key column will have host column outputted in addition to // the value columns: TestCase { - predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("us"))], + predicates: predicates([( + region_col_id, + Predicate::new_in([KeyValue::string("us")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -212,7 +224,10 @@ mod tests { }, // Similar to previous, with a different region predicate: TestCase { - predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("ca"))], + predicates: predicates([( + region_col_id, + Predicate::new_in([KeyValue::string("ca")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -226,7 +241,7 @@ mod tests { // Predicate on only host key column will have region column outputted in addition to // the value columns: TestCase { - predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("a"))], + predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -238,7 +253,7 @@ mod tests { // Omitting all key columns from the predicate will have all key columns included in // the query result: TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -255,10 +270,10 @@ mod tests { // Using a non-existent key column as a predicate has no effect: // TODO: should this be an error? TestCase { - predicates: &[Predicate::new_eq( + predicates: predicates([( ColumnId::new(), - KeyValue::string("12345"), - )], + Predicate::new_in([KeyValue::string("12345")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -274,31 +289,37 @@ mod tests { }, // Using a non existent key column value yields empty result set: TestCase { - predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("eu"))], + predicates: predicates([( + region_col_id, + Predicate::new_in([KeyValue::string("eu")]), + )]), expected: &["++", "++"], }, // Using an invalid combination of key column values yields an empty result set: TestCase { - predicates: &[ - Predicate::new_eq(region_col_id, KeyValue::string("ca")), - Predicate::new_eq(host_col_id, KeyValue::string("a")), - ], + predicates: predicates([ + (region_col_id, Predicate::new_in([KeyValue::string("ca")])), + (host_col_id, Predicate::new_in([KeyValue::string("a")])), + ]), expected: &["++", "++"], }, // Using a non-existent key column value (for host column) also yields empty result set: TestCase { - predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("g"))], + predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("g")]))]), expected: &["++", "++"], }, // Using an incorrect type for a key column value in predicate also yields empty result // set. TODO: should this be an error? TestCase { - predicates: &[Predicate::new_eq(host_col_id, KeyValue::Bool(true))], + predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::Bool(true)]))]), expected: &["++", "++"], }, - // Using a != predicate + // Using a NOT IN predicate TestCase { - predicates: &[Predicate::new_not_eq(region_col_id, KeyValue::string("us"))], + predicates: predicates([( + region_col_id, + Predicate::new_not_in([KeyValue::string("us")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -311,10 +332,10 @@ mod tests { }, // Using an IN predicate: TestCase { - predicates: &[Predicate::new_in( + predicates: predicates([( host_col_id, - vec![KeyValue::string("a"), KeyValue::string("b")], - )], + Predicate::new_in([KeyValue::string("a"), KeyValue::string("b")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -326,10 +347,10 @@ mod tests { }, // Using a NOT IN predicate: TestCase { - predicates: &[Predicate::new_not_in( + predicates: predicates([( host_col_id, - vec![KeyValue::string("a"), KeyValue::string("b")], - )], + Predicate::new_not_in([KeyValue::string("a"), KeyValue::string("b")]), + )]), expected: &[ "+--------+------+-----------------------------+-------+", "| region | host | time | usage |", @@ -345,7 +366,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); @@ -408,16 +429,16 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ TestCase { - predicates: &[ - Predicate::new_eq(region_col_id, KeyValue::string("us")), - Predicate::new_eq(host_col_id, KeyValue::string("a")), - ], + predicates: predicates([ + (region_col_id, Predicate::new_in([KeyValue::string("us")])), + (host_col_id, Predicate::new_in([KeyValue::string("a")])), + ]), expected: &[ "+--------+------+--------------------------------+-------+", "| region | host | time | usage |", @@ -430,7 +451,10 @@ mod tests { ], }, TestCase { - predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("us"))], + predicates: predicates([( + region_col_id, + Predicate::new_in([KeyValue::string("us")]), + )]), expected: &[ "+--------+------+--------------------------------+-------+", "| region | host | time | usage |", @@ -447,7 +471,7 @@ mod tests { ], }, TestCase { - predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("a"))], + predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]), expected: &[ "+--------+------+--------------------------------+-------+", "| region | host | time | usage |", @@ -460,7 +484,7 @@ mod tests { ], }, TestCase { - predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("b"))], + predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("b")]))]), expected: &[ "+--------+------+--------------------------------+-------+", "| region | host | time | usage |", @@ -473,7 +497,7 @@ mod tests { ], }, TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+--------+------+--------------------------------+-------+", "| region | host | time | usage |", @@ -493,7 +517,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); } @@ -536,15 +560,13 @@ mod tests { } // Check the cache for values: - let predicates = &[ - Predicate::new_eq(region_col_id, KeyValue::string("us")), - Predicate::new_eq(host_col_id, KeyValue::string("a")), - ]; + let p = predicates([ + (region_col_id, Predicate::new_in([KeyValue::string("us")])), + (host_col_id, Predicate::new_in([KeyValue::string("a")])), + ]); // Check what is in the last cache: - let batches = cache - .to_record_batches(Arc::clone(&table_def), predicates) - .unwrap(); + let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap(); assert_batches_sorted_eq!( [ @@ -561,9 +583,7 @@ mod tests { thread::sleep(Duration::from_millis(1000)); // Check what is in the last cache: - let batches = cache - .to_record_batches(Arc::clone(&table_def), predicates) - .unwrap(); + let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap(); // The cache is completely empty after the TTL evicted data, so it will give back nothing: assert_batches_sorted_eq!( @@ -583,12 +603,10 @@ mod tests { } // Check the cache for values: - let predicates = &[Predicate::new_eq(host_col_id, KeyValue::string("a"))]; + let p = predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]); // Check what is in the last cache: - let batches = cache - .to_record_batches(Arc::clone(&table_def), predicates) - .unwrap(); + let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap(); assert_batches_sorted_eq!( [ @@ -645,14 +663,14 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ // No predicates gives everything: TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+--------------+--------+-------------+-----------+---------+-----------------------------+", "| component_id | active | type | loc | reading | time |", @@ -668,7 +686,9 @@ mod tests { }, // Predicates on tag key column work as expected: TestCase { - predicates: &[Predicate::new_eq(component_id_col_id, KeyValue::string("333"))], + predicates: predicates([ + (component_id_col_id, Predicate::new_in([KeyValue::string("333")])) + ]), expected: &[ "+--------------+--------+--------+------+---------+-----------------------------+", "| component_id | active | type | loc | reading | time |", @@ -679,7 +699,9 @@ mod tests { }, // Predicate on a non-string field key: TestCase { - predicates: &[Predicate::new_eq(active_col_id, KeyValue::Bool(false))], + predicates: predicates([ + (active_col_id, Predicate::new_in([KeyValue::Bool(false)])) + ]), expected: &[ "+--------------+--------+-------------+---------+---------+-----------------------------+", "| component_id | active | type | loc | reading | time |", @@ -691,7 +713,9 @@ mod tests { }, // Predicate on a string field key: TestCase { - predicates: &[Predicate::new_eq(type_col_id, KeyValue::string("camera"))], + predicates: predicates([ + (type_col_id, Predicate::new_in([KeyValue::string("camera")])) + ]), expected: &[ "+--------------+--------+--------+-----------+---------+-----------------------------+", "| component_id | active | type | loc | reading | time |", @@ -706,7 +730,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); } @@ -748,14 +772,14 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ // No predicates yields everything in the cache TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+-------+--------+-------+-------+-----------------------------+", "| state | county | farm | speed | time |", @@ -771,7 +795,10 @@ mod tests { }, // Predicate on state column, which is part of the series key: TestCase { - predicates: &[Predicate::new_eq(state_col_id, KeyValue::string("ca"))], + predicates: predicates([( + state_col_id, + Predicate::new_in([KeyValue::string("ca")]), + )]), expected: &[ "+-------+--------+-------+-------+-----------------------------+", "| state | county | farm | speed | time |", @@ -787,7 +814,10 @@ mod tests { }, // Predicate on county column, which is part of the series key: TestCase { - predicates: &[Predicate::new_eq(county_col_id, KeyValue::string("napa"))], + predicates: predicates([( + county_col_id, + Predicate::new_in([KeyValue::string("napa")]), + )]), expected: &[ "+-------+--------+-------+-------+-----------------------------+", "| state | county | farm | speed | time |", @@ -799,7 +829,10 @@ mod tests { }, // Predicate on farm column, which is part of the series key: TestCase { - predicates: &[Predicate::new_eq(farm_col_id, KeyValue::string("30-01"))], + predicates: predicates([( + farm_col_id, + Predicate::new_in([KeyValue::string("30-01")]), + )]), expected: &[ "+-------+--------+-------+-------+-----------------------------+", "| state | county | farm | speed | time |", @@ -810,11 +843,14 @@ mod tests { }, // Predicate on all series key columns: TestCase { - predicates: &[ - Predicate::new_eq(state_col_id, KeyValue::string("ca")), - Predicate::new_eq(county_col_id, KeyValue::string("nevada")), - Predicate::new_eq(farm_col_id, KeyValue::string("40-01")), - ], + predicates: predicates([ + (state_col_id, Predicate::new_in([KeyValue::string("ca")])), + ( + county_col_id, + Predicate::new_in([KeyValue::string("nevada")]), + ), + (farm_col_id, Predicate::new_in([KeyValue::string("40-01")])), + ]), expected: &[ "+-------+--------+-------+-------+-----------------------------+", "| state | county | farm | speed | time |", @@ -827,7 +863,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); @@ -870,7 +906,7 @@ mod tests { cache.push(row, Arc::clone(&table_def)); } - let batches = cache.to_record_batches(table_def, &[]).unwrap(); + let batches = cache.to_record_batches(table_def, &predicates([])).unwrap(); assert_batches_sorted_eq!( [ @@ -925,14 +961,17 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ // Cache that has values in the zone columns should produce them: TestCase { - predicates: &[Predicate::new_eq(game_id_col_id, KeyValue::string("4"))], + predicates: predicates([( + game_id_col_id, + Predicate::new_in([KeyValue::string("4")]), + )]), expected: &[ "+---------+-----------+-----------------------------+------+------+", "| game_id | player | time | type | zone |", @@ -943,7 +982,10 @@ mod tests { }, // Cache that does not have a zone column will produce it with nulls: TestCase { - predicates: &[Predicate::new_eq(game_id_col_id, KeyValue::string("1"))], + predicates: predicates([( + game_id_col_id, + Predicate::new_in([KeyValue::string("1")]), + )]), expected: &[ "+---------+-----------+-----------------------------+------+------+", "| game_id | player | time | type | zone |", @@ -954,7 +996,7 @@ mod tests { }, // Pulling from multiple caches will fill in with nulls: TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+---------+-----------+-----------------------------+------+------+", "| game_id | player | time | type | zone |", @@ -970,7 +1012,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); @@ -1028,14 +1070,14 @@ mod tests { } struct TestCase<'a> { - predicates: &'a [Predicate], + predicates: IndexMap, expected: &'a [&'a str], } let test_cases = [ // Can query on specific key column values: TestCase { - predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("a"))], + predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("a")]))]), expected: &[ "+----+-----+-----+-----+-----+--------------------------------+", "| t1 | f1 | f2 | f3 | f4 | time |", @@ -1045,7 +1087,7 @@ mod tests { ], }, TestCase { - predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("b"))], + predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("b")]))]), expected: &[ "+----+------+----+------+------+--------------------------------+", "| t1 | f1 | f2 | f3 | f4 | time |", @@ -1055,7 +1097,7 @@ mod tests { ], }, TestCase { - predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("c"))], + predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("c")]))]), expected: &[ "+----+-------+-------+-------+----+--------------------------------+", "| t1 | f1 | f2 | f3 | f4 | time |", @@ -1066,7 +1108,7 @@ mod tests { }, // Can query accross key column values: TestCase { - predicates: &[], + predicates: predicates([]), expected: &[ "+----+-------+-------+-------+------+--------------------------------+", "| t1 | f1 | f2 | f3 | f4 | time |", @@ -1081,7 +1123,7 @@ mod tests { for t in test_cases { let batches = cache - .to_record_batches(Arc::clone(&table_def), t.predicates) + .to_record_batches(Arc::clone(&table_def), &t.predicates) .unwrap(); assert_batches_sorted_eq!(t.expected, &batches); @@ -1325,4 +1367,267 @@ mod tests { }); insta::assert_json_snapshot!(caches); } + + /// This test sets up a [`LastCacheProvider`], creates a [`LastCache`] using the `region` and + /// `host` columns as keys, and then writes row data containing several unique combinations of + /// the key columns to the cache. It then sets up a DataFusion [`SessionContext`], registers + /// the [`LastCacheFunction`] as a UDTF, and runs a series of test cases to verify queries made + /// using the function. + /// + /// The purpose of this is to verify that the predicate pushdown by the UDTF [`TableProvider`] + /// is working. + /// + /// Each test case verifies both the `RecordBatch` output, as well as the output of the `EXPLAIN` + /// for a given query. The `EXPLAIN` contains a line for the `LastCacheExec`, which will list + /// out any predicates that were pushed down from the provided SQL query to the cache. + #[tokio::test] + async fn datafusion_udtf_predicate_conversion() { + let writer = TestWriter::new(); + let _ = writer.write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0); + + // create a last cache provider so we can use it to create our UDTF provider: + let db_schema = writer.db_schema(); + let table_def = db_schema.table_definition("cpu").unwrap(); + let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap(); + provider + .create_cache( + db_schema.id, + None, + CreateLastCacheArgs { + table_def, + count: LastCacheSize::default(), + ttl: LastCacheTtl::default(), + key_columns: LastCacheKeyColumnsArg::SeriesKey, + value_columns: LastCacheValueColumnsArg::AcceptNew, + }, + ) + .unwrap(); + + // make some writes into the cache: + let write_batch = writer.write_lp_to_write_batch( + "\ + cpu,region=us-east,host=a usage=77,temp=66\n\ + cpu,region=us-east,host=b usage=77,temp=66\n\ + cpu,region=us-west,host=c usage=77,temp=66\n\ + cpu,region=us-west,host=d usage=77,temp=66\n\ + cpu,region=ca-east,host=e usage=77,temp=66\n\ + cpu,region=ca-cent,host=f usage=77,temp=66\n\ + cpu,region=ca-west,host=g usage=77,temp=66\n\ + cpu,region=ca-west,host=h usage=77,temp=66\n\ + cpu,region=eu-cent,host=i usage=77,temp=66\n\ + cpu,region=eu-cent,host=j usage=77,temp=66\n\ + cpu,region=eu-west,host=k usage=77,temp=66\n\ + cpu,region=eu-west,host=l usage=77,temp=66\n\ + ", + 1_000, + ); + let wal_contents = influxdb3_wal::create::wal_contents( + (0, 1, 0), + [influxdb3_wal::create::write_batch_op(write_batch)], + ); + provider.write_wal_contents_to_cache(&wal_contents); + + let ctx = SessionContext::new(); + let last_cache_fn = LastCacheFunction::new(db_schema.id, Arc::clone(&provider)); + ctx.register_udtf(LAST_CACHE_UDTF_NAME, Arc::new(last_cache_fn)); + + struct TestCase<'a> { + /// A short description of the test + _desc: &'a str, + /// A SQL expression to evaluate using the datafusion session context, should be of + /// the form: + /// ```sql + /// SELECT * FROM last_cache('cpu') ... + /// ``` + sql: &'a str, + /// Expected record batch output + expected: &'a [&'a str], + /// Expected EXPLAIN output contains this. + /// + /// For checking the `LastCacheExec` portion of the EXPLAIN output for the given `sql` + /// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent + /// flakyness from upstream changes to other parts of the query plan. + explain_contains: &'a str, + } + + let test_cases = [ + TestCase { + _desc: "no predicates", + sql: "SELECT * FROM last_cache('cpu')", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: + "LastCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[12]", + }, + TestCase { + _desc: "eq predicate on region", + sql: "SELECT * FROM last_cache('cpu') WHERE region = 'us-east'", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east')]] inner=MemoryExec: partitions=1, partition_sizes=[2]", + }, + TestCase { + _desc: "not eq predicate on region", + sql: "SELECT * FROM last_cache('cpu') WHERE region != 'us-east'", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 NOT IN ('us-east')]] inner=MemoryExec: partitions=1, partition_sizes=[10]", + }, + TestCase { + _desc: "double eq predicate on region", + sql: "SELECT * FROM last_cache('cpu') \ + WHERE region = 'us-east' \ + OR region = 'us-west'", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')]] inner=MemoryExec: partitions=1, partition_sizes=[4]", + }, + TestCase { + _desc: "triple eq predicate on region", + sql: "SELECT * FROM last_cache('cpu') \ + WHERE region = 'us-east' \ + OR region = 'us-west' \ + OR region = 'ca-west'", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 IN ('ca-west','us-east','us-west')]] inner=MemoryExec: partitions=1, partition_sizes=[6]", + }, + TestCase { + _desc: "eq predicate on region AND eq predicate on host", + sql: "SELECT * FROM last_cache('cpu') \ + WHERE (region = 'us-east' OR region = 'us-west') \ + AND (host = 'a' OR host = 'c')", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')], [host@1 IN ('a','c')]] inner=MemoryExec: partitions=1, partition_sizes=[2]", + }, + TestCase { + _desc: "in predicate on region", + sql: "SELECT * FROM last_cache('cpu') \ + WHERE region IN ('ca-east', 'ca-west')", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 IN ('ca-east','ca-west')]] inner=MemoryExec: partitions=1, partition_sizes=[3]", + }, + TestCase { + _desc: "not in predicate on region", + sql: "SELECT * FROM last_cache('cpu') \ + WHERE region NOT IN ('ca-east', 'ca-west')", + expected: &[ + "+---------+------+------+-----------------------------+-------+", + "| region | host | temp | time | usage |", + "+---------+------+------+-----------------------------+-------+", + "| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |", + "+---------+------+------+-----------------------------+-------+", + ], + explain_contains: "LastCacheExec: predicates=[[region@0 NOT IN ('ca-east','ca-west')]] inner=MemoryExec: partitions=1, partition_sizes=[9]", + }, + ]; + + for tc in test_cases { + // do the query: + let results = ctx.sql(tc.sql).await.unwrap().collect().await.unwrap(); + println!("test case: {}", tc._desc); + // check the result: + assert_batches_sorted_eq!(tc.expected, &results); + let explain = ctx + .sql(format!("EXPLAIN {sql}", sql = tc.sql).as_str()) + .await + .unwrap() + .collect() + .await + .unwrap() + .pop() + .unwrap(); + assert!( + explain + .column_by_name("plan") + .unwrap() + .as_string::() + .iter() + .any(|plan| plan.is_some_and(|plan| plan.contains(tc.explain_contains))), + "explain plan did not contain the expression:\n\n\ + {expected}\n\n\ + instead, the output was:\n\n\ + {actual:#?}", + expected = tc.explain_contains, + actual = explain.column_by_name("plan").unwrap().as_string::(), + ); + } + } } diff --git a/influxdb3_cache/src/last_cache/provider.rs b/influxdb3_cache/src/last_cache/provider.rs index 0cf4273b772..57b74c5fbda 100644 --- a/influxdb3_cache/src/last_cache/provider.rs +++ b/influxdb3_cache/src/last_cache/provider.rs @@ -9,7 +9,7 @@ use observability_deps::tracing::debug; use parking_lot::RwLock; use super::{ - cache::{LastCache, LastCacheValueColumnsArg, Predicate}, + cache::{LastCache, LastCacheValueColumnsArg}, CreateLastCacheArgs, Error, }; @@ -341,7 +341,6 @@ impl LastCacheProvider { db_id: DbId, table_id: TableId, cache_name: Option<&str>, - predicates: &[Predicate], ) -> Option, ArrowError>> { let table_def = self .catalog @@ -362,7 +361,7 @@ impl LastCacheProvider { None } }) - .map(|lc| lc.to_record_batches(table_def, predicates)) + .map(|lc| lc.to_record_batches(table_def, &Default::default())) } /// Returns the total number of caches contained in the provider diff --git a/influxdb3_cache/src/last_cache/table_function.rs b/influxdb3_cache/src/last_cache/table_function.rs index f7f0b974d00..49121649c9c 100644 --- a/influxdb3_cache/src/last_cache/table_function.rs +++ b/influxdb3_cache/src/last_cache/table_function.rs @@ -1,28 +1,49 @@ use std::{any::Any, sync::Arc}; -use arrow::datatypes::SchemaRef; +use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; use datafusion::{ catalog::{Session, TableProvider}, - common::plan_err, + common::{internal_err, plan_err, DFSchema}, datasource::{function::TableFunctionImpl, TableType}, error::DataFusionError, + execution::context::ExecutionProps, logical_expr::TableProviderFilterPushDown, - physical_plan::{memory::MemoryExec, ExecutionPlan}, + physical_expr::{ + create_physical_expr, + utils::{Guarantee, LiteralGuarantee}, + }, + physical_plan::{memory::MemoryExec, DisplayAs, DisplayFormatType, ExecutionPlan}, prelude::Expr, scalar::ScalarValue, }; +use indexmap::{IndexMap, IndexSet}; use influxdb3_catalog::catalog::TableDefinition; -use influxdb3_id::DbId; +use influxdb3_id::{ColumnId, DbId}; +use schema::{InfluxColumnType, InfluxFieldType}; -use super::LastCacheProvider; +use super::{ + cache::{KeyValue, Predicate}, + LastCacheProvider, +}; + +/// The name of the function that is called to query the last cache +pub const LAST_CACHE_UDTF_NAME: &str = "last_cache"; +/// Implementor of the [`TableProvider`] trait that is produced with a call to the +/// [`LastCacheFunction`] #[derive(Debug)] struct LastCacheFunctionProvider { + /// The database ID that the query calling to the cache is associated with db_id: DbId, + /// The table definition that the cache being called is associated with table_def: Arc, + /// The name of the cache cache_name: Arc, + /// Reference to the cache's schema schema: SchemaRef, + /// Forwarded reference of the [`LastCacheProvider`], which is used to get the `LastCache` + /// for the query using the `db_id` and `table_def`. provider: Arc, } @@ -55,19 +76,32 @@ impl TableProvider for LastCacheFunctionProvider { _limit: Option, ) -> Result, DataFusionError> { let read = self.provider.cache_map.read(); - let batches = if let Some(cache) = read + let (predicates, batches) = if let Some(cache) = read .get(&self.db_id) .and_then(|db| db.get(&self.table_def.table_id)) .and_then(|tbl| tbl.get(&self.cache_name)) { - let predicates = cache.convert_filter_exprs(filters); - cache.to_record_batches(Arc::clone(&self.table_def), &predicates)? + let predicates = convert_filter_exprs( + self.table_def.as_ref(), + cache.key_column_ids.as_ref(), + Arc::clone(&self.schema), + filters, + )?; + let batches = cache.to_record_batches(Arc::clone(&self.table_def), &predicates)?; + ((!predicates.is_empty()).then_some(predicates), batches) } else { // If there is no cache, it means that it was removed, in which case, we just return // an empty set of record batches. - vec![] + (None, vec![]) }; - let mut exec = MemoryExec::try_new(&[batches], self.schema(), projection.cloned())?; + drop(read); + let mut exec = LastCacheExec::try_new( + predicates, + Arc::clone(&self.table_def), + &[batches], + self.schema(), + projection.cloned(), + )?; let show_sizes = ctx.config_options().explain.show_sizes; exec = exec.with_show_sizes(show_sizes); @@ -76,6 +110,181 @@ impl TableProvider for LastCacheFunctionProvider { } } +/// Convert the given list of filter expresions `filters` to a map of [`ColumnId`] to [`Predicate`] +/// +/// The resulting map is an [`IndexMap`] to ensure consistent ordering of entries in the map, which +/// makes testing the filter conversions easier via `EXPLAIN` query plans. +fn convert_filter_exprs( + table_def: &TableDefinition, + cache_key_column_ids: &IndexSet, + cache_schema: SchemaRef, + filters: &[Expr], +) -> Result, DataFusionError> { + let mut predicate_map: IndexMap> = IndexMap::new(); + + // used by `create_physical_expr` in the loop below: + let schema: DFSchema = cache_schema.try_into()?; + let props = ExecutionProps::new(); + + // The set of `filters` that are passed in from DataFusion varies: 1) based on how they are + // defined in the query, and 2) based on some decisions that DataFusion makes when parsing the + // query into the `Expr` syntax tree. For example, the predicate: + // + // WHERE foo IN ('bar', 'baz') + // + // instead of being expressed as an `InList`, would be simplified to the following `Expr` tree: + // + // [ + // BinaryExpr { + // left: BinaryExpr { left: "foo", op: Eq, right: "bar" }, + // op: Or, + // right: BinaryExpr { left: "foo", op: Eq, right: "baz" } + // } + // ] + // + // while the predicate: + // + // WHERE foo = 'bar' OR foo = 'baz' OR foo = 'bop' OR foo = 'bla' + // + // instead of being expressed as a tree of `BinaryExpr`s, is expressed as an `InList` with four + // entries: + // + // [ + // InList { col: "foo", values: ["bar", "baz", "bop", "bla"], negated: false } + // ] + // + // Instead of handling all the combinations of `Expr`s that may be passed by the caller of + // `TableProider::scan`, we can use the cache's schema to convert each `Expr` to a `PhysicalExpr` + // and analyze it using DataFusion's `LiteralGuarantee`. + // + // This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list + // which we can convert to the `Predicate` type for the lastcache. + // + // Special handling is taken for the case where multiple literal guarantees are encountered for + // a given column. This would happen for clauses split with an AND conjunction. From the tests + // run thusfar, this happens when a query contains a WHERE clause, e.g., + // + // WHERE a != 'foo' AND a != 'bar' + // + // or, + // + // WHERE a NOT IN ('foo', 'bar') + // + // which DataFusion simplifies to the previous clause that uses an AND binary expression. + + for expr in filters { + let physical_expr = create_physical_expr(expr, &schema, &props)?; + let literal_guarantees = LiteralGuarantee::analyze(&physical_expr); + for LiteralGuarantee { + column, + guarantee, + literals, + } in literal_guarantees + { + let Some(column_def) = table_def.column_definition(column.name()) else { + return plan_err!( + "invalid column name in filter expression: {}", + column.name() + ); + }; + // do not handle predicates on non-key columns, let datafusion do that: + if !cache_key_column_ids.contains(&column_def.id) { + continue; + } + // convert the literal values from the query into `KeyValue`s for the last cache + // predicate, and also validate that the literal type is compatible with the column + // being predicated. + let value_set = literals + .into_iter() + .map(|literal| match (literal, column_def.data_type) { + ( + ScalarValue::Boolean(Some(b)), + InfluxColumnType::Field(InfluxFieldType::Boolean), + ) => Ok(KeyValue::Bool(b)), + ( + ScalarValue::Int64(Some(i)), + InfluxColumnType::Field(InfluxFieldType::Integer), + ) => Ok(KeyValue::Int(i)), + ( + ScalarValue::UInt64(Some(u)), + InfluxColumnType::Field(InfluxFieldType::UInteger), + ) => Ok(KeyValue::UInt(u)), + ( + ScalarValue::Utf8(Some(s)) + | ScalarValue::Utf8View(Some(s)) + | ScalarValue::LargeUtf8(Some(s)), + InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String), + ) => Ok(KeyValue::String(s)), + // TODO: handle Dictionary here? + (other_literal, column_data_type) => { + plan_err!( + "incompatible literal applied in predicate to column, \ + column: {}, \ + literal: {other_literal}, \ + column type: {column_data_type}", + column.name() + ) + } + }) + .collect::>()?; + let mut predicate = match guarantee { + Guarantee::In => Predicate::In(value_set), + Guarantee::NotIn => Predicate::NotIn(value_set), + }; + // place the predicate into the map, handling the case for a column already encountered + predicate_map + .entry(column_def.id) + .and_modify(|e| { + if let Some(existing) = e { + match (existing, &mut predicate) { + // if we encounter a IN predicate on a column for which we already have + // a IN guarantee, we take their intersection, i.e., + // + // a IN (1, 2) AND a IN (2, 3) + // + // becomes + // + // a IN (2) + (Predicate::In(ref mut existing_set), Predicate::In(new_set)) => { + *existing_set = + existing_set.intersection(new_set).cloned().collect(); + // if the result is empty, just remove the predicate + if existing_set.is_empty() { + e.take(); + } + } + // if we encounter a NOT IN predicate on a column for which we already + // have a NOT IN guarantee, we extend the two, i.e., + // + // a NOT IN (1, 2) AND a NOT IN (3, 4) + // + // becomes + // + // a NOT IN (1, 2, 3, 4) + (Predicate::NotIn(existing_set), Predicate::NotIn(new_set)) => { + existing_set.append(new_set) + } + // for non matching predicate types, we just remove by taking the + // Option. We will let DataFusion handle the predicate at a higher + // filter level in this case... + _ => { + e.take(); + } + } + } + }) + .or_insert_with(|| Some(predicate)); + } + } + + Ok(predicate_map + .into_iter() + .filter_map(|(column_id, predicate)| predicate.map(|predicate| (column_id, predicate))) + .collect()) +} + +/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table +/// function in the DataFusion `SessionContext`. #[derive(Debug)] pub struct LastCacheFunction { db_id: DbId, @@ -127,3 +336,117 @@ impl TableFunctionImpl for LastCacheFunction { })) } } + +/// Custom implementor of the [`ExecutionPlan`] trait for use by the last cache +/// +/// Wraps a [`MemoryExec`] from DataFusion which it relies on for the actual implementation of the +/// [`ExecutionPlan`] trait. The additional functionality provided by this type is that it tracks +/// the predicates that are pushed down to the underlying cache during query planning/execution. +/// +/// # Example +/// +/// For a query that does not provide any predicates, or one that does provide predicates, but they +/// do not get pushed down, the `EXPLAIN` for said query will contain a line for the `LastCacheExec` +/// with no predicates, as well as the info emitted for the inner `MemoryExec`, e.g., +/// +/// ```text +/// LastCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[12] +/// ``` +/// +/// For queries that do have predicates that get pushed down, the output will include them, e.g., +/// +/// ```text +/// LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')]] inner=[...] +/// ``` +#[derive(Debug)] +struct LastCacheExec { + inner: MemoryExec, + table_def: Arc, + predicates: Option>, +} + +impl LastCacheExec { + fn try_new( + predicates: Option>, + table_def: Arc, + partitions: &[Vec], + cache_schema: SchemaRef, + projection: Option>, + ) -> Result { + Ok(Self { + inner: MemoryExec::try_new(partitions, cache_schema, projection)?, + table_def, + predicates, + }) + } + + fn with_show_sizes(self, show_sizes: bool) -> Self { + Self { + inner: self.inner.with_show_sizes(show_sizes), + ..self + } + } +} + +impl DisplayAs for LastCacheExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "LastCacheExec:")?; + if let Some(predicates) = self.predicates.as_ref() { + write!(f, " predicates=[")?; + let mut p_iter = predicates.iter(); + while let Some((col_id, predicate)) = p_iter.next() { + let col_name = self.table_def.column_id_to_name(col_id).unwrap_or_default(); + write!(f, "[{col_name}@{col_id} {predicate}]")?; + if p_iter.size_hint().0 > 0 { + write!(f, ", ")?; + } + } + write!(f, "]")?; + } + write!(f, " inner=")?; + self.inner.fmt_as(t, f) + } + } + } +} + +impl ExecutionPlan for LastCacheExec { + fn name(&self) -> &str { + "LastCacheExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::error::Result> { + // (copied from MemoryExec): + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::error::Result { + self.inner.execute(partition, context) + } +} diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index 1c56e22d2c1..536908fd421 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -19,7 +19,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_util::config::DEFAULT_SCHEMA; use datafusion_util::MemoryStream; -use influxdb3_cache::last_cache::LastCacheFunction; +use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME}; use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_sys_events::SysEventStore; @@ -491,8 +491,6 @@ impl QueryNamespace for Database { } } -const LAST_CACHE_UDTF_NAME: &str = "last_cache"; - impl CatalogProvider for Database { fn as_any(&self) -> &dyn Any { self as &dyn Any diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 87305fb2d08..60ed2be62f1 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -974,7 +974,7 @@ mod tests { ]; let actual = wbuf .last_cache_provider() - .get_cache_record_batches(db_id, tbl_id, None, &[]) + .get_cache_record_batches(db_id, tbl_id, None) .unwrap() .unwrap(); assert_batches_eq!(&expected, &actual);