Skip to content

Commit

Permalink
feat: LastCacheExec to track predicate pushdown in last cache queries
Browse files Browse the repository at this point in the history
The Predicate type was refactored to only support a IN or NOT IN type of
clause, to align with DataFusion's LiteralGuarantee analyzer.

This caused some churn in the last cache tests but they remain unchanged
in their assertions.

Added a test to check predicate handling from datafusion.

Updated code docs and comments to explain the changes.
  • Loading branch information
hiltontj committed Dec 6, 2024
1 parent 9b87cd7 commit 2588235
Show file tree
Hide file tree
Showing 6 changed files with 803 additions and 255 deletions.
227 changes: 75 additions & 152 deletions influxdb3_cache/src/last_cache/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{BTreeSet, HashMap, HashSet, VecDeque},
ops::Deref,
sync::Arc,
time::{Duration, Instant},
Expand All @@ -17,11 +17,6 @@ use arrow::{
},
error::ArrowError,
};
use datafusion::{
logical_expr::{expr::InList, BinaryExpr, Operator},
prelude::Expr,
scalar::ScalarValue,
};
use indexmap::{IndexMap, IndexSet};
use influxdb3_catalog::catalog::{ColumnDefinition, TableDefinition, TIME_COLUMN_NAME};
use influxdb3_id::{ColumnId, TableId};
Expand Down Expand Up @@ -56,8 +51,6 @@ pub(crate) struct LastCache {
/// map preserves the order of the elements, thereby maintaining the order of the keys in
/// the cache.
pub(crate) key_column_ids: Arc<IndexSet<ColumnId>>,
/// The key columns for this cache, by their names
pub(crate) key_column_name_to_ids: Arc<HashMap<Arc<str>, ColumnId>>,
/// The value columns for this cache
pub(crate) value_columns: ValueColumnType,
/// The Arrow Schema for the table that this cache is associated with
Expand Down Expand Up @@ -86,6 +79,7 @@ pub struct CreateLastCacheArgs {
/// The default cache time-to-live (TTL) is 4 hours
pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 4);

/// The time to live (TTL) for entries in the cache
#[derive(Debug, Clone, Copy, Deserialize)]
pub struct LastCacheTtl(Duration);

Expand Down Expand Up @@ -115,6 +109,7 @@ impl Default for LastCacheTtl {
}
}

/// Specifies the key column configuration for a new [`LastCache`]
#[derive(Debug, Default, Clone)]
pub enum LastCacheKeyColumnsArg {
/// Use the series key columns in their order as the last cache key columns
Expand All @@ -133,6 +128,7 @@ impl From<Option<Vec<ColumnId>>> for LastCacheKeyColumnsArg {
}
}

/// Specifies the value column configuration for a new [`LastCache`]
#[derive(Debug, Default, Clone)]
pub enum LastCacheValueColumnsArg {
/// Use all non-key columns as value columns when initialized, and add new field columns that
Expand All @@ -156,6 +152,10 @@ impl From<Option<Vec<ColumnId>>> for LastCacheValueColumnsArg {

impl LastCache {
/// Create a new [`LastCache`]
///
/// The uses the provided `TableDefinition` to build an arrow schema for the cache. This will
/// validate the given arguments and can error if there are invalid columns specified, or if a
/// non-compatible column is used as a key to the cache.
pub(crate) fn new(
CreateLastCacheArgs {
table_def,
Expand Down Expand Up @@ -260,7 +260,6 @@ impl LastCache {
count,
ttl: ttl.into(),
key_column_ids: Arc::new(key_column_ids),
key_column_name_to_ids: Arc::new(key_column_name_to_ids),
value_columns: match value_columns {
LastCacheValueColumnsArg::AcceptNew => ValueColumnType::AcceptNew { seen },
LastCacheValueColumnsArg::Explicit(_) => ValueColumnType::Explicit {
Expand Down Expand Up @@ -340,32 +339,36 @@ impl LastCache {
/// This will panic if the internal cache state's keys are out-of-order with respect to the
/// order of the `key_columns` on this [`LastCache`]
pub(crate) fn push(&mut self, row: &Row, table_def: Arc<TableDefinition>) {
let accept_new_fields = self.accept_new_fields();
let mut target = &mut self.state;
let mut key_iter = self.key_column_ids.iter().peekable();
while let (Some(col_id), peek) = (key_iter.next(), key_iter.peek()) {
if target.is_init() {
*target = LastCacheState::Key(LastCacheKey {
column_id: *col_id,
value_map: Default::default(),
});
}
let mut values = Vec::with_capacity(self.key_column_ids.len());
for id in self.key_column_ids.iter() {
let Some(value) = row
.fields
.iter()
.find(|f| f.id == *col_id)
.find(|f| &f.id == id)
.map(|f| KeyValue::from(&f.value))
else {
// ignore the row if it does not contain all key columns
return;
};
values.push(value);
}
let accept_new_fields = self.accept_new_fields();
let mut target = &mut self.state;
let mut iter = self.key_column_ids.iter().zip(values).peekable();
while let (Some((col_id, value)), peek) = (iter.next(), iter.peek()) {
if target.is_init() {
*target = LastCacheState::Key(LastCacheKey {
column_id: *col_id,
value_map: Default::default(),
});
}
let cache_key = target.as_key_mut().unwrap();
assert_eq!(
&cache_key.column_id, col_id,
"key columns must match cache key order"
);
target = cache_key.value_map.entry(value).or_insert_with(|| {
if let Some(next_col_id) = peek {
if let Some((next_col_id, _)) = peek {
LastCacheState::Key(LastCacheKey {
column_id: **next_col_id,
value_map: Default::default(),
Expand Down Expand Up @@ -411,14 +414,14 @@ impl LastCache {
pub(crate) fn to_record_batches(
&self,
table_def: Arc<TableDefinition>,
predicates: &[Predicate],
predicates: &IndexMap<ColumnId, Predicate>,
) -> Result<Vec<RecordBatch>, ArrowError> {
// map the provided predicates on to the key columns
// there may not be predicates provided for each key column, hence the Option
let predicates: Vec<Option<&Predicate>> = self
.key_column_ids
.iter()
.map(|col_id| predicates.iter().find(|p| p.column_id == *col_id))
.map(|id| predicates.get(id))
.collect();

let mut caches = vec![ExtendedLastCacheState {
Expand Down Expand Up @@ -465,78 +468,6 @@ impl LastCache {
.collect()
}

/// Convert a set of DataFusion filter [`Expr`]s into [`Predicate`]s
///
/// This only handles binary expressions, e.g., `foo = 'bar'`, and will use the `key_columns`
/// to filter out expressions that do not match key columns in the cache.
pub(crate) fn convert_filter_exprs(&self, exprs: &[Expr]) -> Vec<Predicate> {
exprs
.iter()
.filter_map(|expr| {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let col_id = if let Expr::Column(c) = left.as_ref() {
self.key_column_name_to_ids.get(c.name()).copied()?
} else {
return None;
};
let value = match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(v))) => {
KeyValue::String(v.to_owned())
}
Expr::Literal(ScalarValue::Boolean(Some(v))) => KeyValue::Bool(*v),
// TODO: handle integer types that can be casted up to i64/u64:
Expr::Literal(ScalarValue::Int64(Some(v))) => KeyValue::Int(*v),
Expr::Literal(ScalarValue::UInt64(Some(v))) => KeyValue::UInt(*v),
_ => return None,
};
match op {
Operator::Eq => Some(Predicate::new_eq(col_id, value)),
Operator::NotEq => Some(Predicate::new_not_eq(col_id, value)),
_ => None,
}
}
Expr::InList(InList {
expr,
list,
negated,
}) => {
let col_id = if let Expr::Column(c) = expr.as_ref() {
self.key_column_name_to_ids.get(c.name()).copied()?
} else {
return None;
};
let values: Vec<KeyValue> = list
.iter()
.filter_map(|e| match e {
Expr::Literal(ScalarValue::Utf8(Some(v))) => {
Some(KeyValue::String(v.to_owned()))
}
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
Some(KeyValue::Bool(*v))
}
// TODO: handle integer types that can be casted up to i64/u64:
Expr::Literal(ScalarValue::Int64(Some(v))) => {
Some(KeyValue::Int(*v))
}
Expr::Literal(ScalarValue::UInt64(Some(v))) => {
Some(KeyValue::UInt(*v))
}
_ => None,
})
.collect();
if *negated {
Some(Predicate::new_not_in(col_id, values))
} else {
Some(Predicate::new_in(col_id, values))
}
}
_ => None,
}
})
.collect()
}

/// Remove expired values from the internal cache state
pub(crate) fn remove_expired(&mut self) {
self.state.remove_expired();
Expand Down Expand Up @@ -663,50 +594,52 @@ impl ExtendedLastCacheState<'_> {
}

/// A predicate used for evaluating key column values in the cache on query
///
/// Can either be an inclusive set or exclusive set. `BTreeSet` is used to
/// have the predicate values odered and displayed in a sorted order in
/// query `EXPLAIN` plans.
#[derive(Debug, Clone)]
pub struct Predicate {
/// The left-hand-side of the predicate as a valid `ColumnId`
column_id: ColumnId,
/// The right-hand-side of the predicate
kind: PredicateKind,
pub(crate) enum Predicate {
In(BTreeSet<KeyValue>),
NotIn(BTreeSet<KeyValue>),
}

impl Predicate {
pub(crate) fn new_eq(column_id: ColumnId, value: KeyValue) -> Self {
Self {
column_id,
kind: PredicateKind::Eq(value),
impl std::fmt::Display for Predicate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Predicate::In(_) => write!(f, "IN (")?,
Predicate::NotIn(_) => write!(f, "NOT IN (")?,
}
}

pub(crate) fn new_not_eq(column_id: ColumnId, value: KeyValue) -> Self {
Self {
column_id,
kind: PredicateKind::NotEq(value),
let mut values = self.values();
while let Some(v) = values.next() {
write!(f, "{v}")?;
if values.size_hint().0 > 0 {
write!(f, ",")?;
}
}
}

pub(crate) fn new_in(column_id: ColumnId, values: Vec<KeyValue>) -> Self {
Self {
column_id,
kind: PredicateKind::In(values),
}
write!(f, ")")
}
}

pub(crate) fn new_not_in(column_id: ColumnId, values: Vec<KeyValue>) -> Self {
Self {
column_id,
kind: PredicateKind::NotIn(values),
impl Predicate {
fn values(&self) -> impl Iterator<Item = &KeyValue> {
match self {
Predicate::In(btree_set) => btree_set.iter(),
Predicate::NotIn(btree_set) => btree_set.iter(),
}
}
}

#[derive(Debug, Clone)]
pub(crate) enum PredicateKind {
Eq(KeyValue),
NotEq(KeyValue),
In(Vec<KeyValue>),
NotIn(Vec<KeyValue>),
#[cfg(test)]
impl Predicate {
pub(crate) fn new_in(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self::In(values.into_iter().collect())
}

pub(crate) fn new_not_in(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self::NotIn(values.into_iter().collect())
}
}

/// Represents the hierarchical last cache structure
Expand Down Expand Up @@ -777,37 +710,16 @@ struct LastCacheKey {
impl LastCacheKey {
/// Evaluate the provided [`Predicate`] by using its value to lookup in this [`LastCacheKey`]'s
/// value map.
///
/// # Panics
///
/// This assumes that a predicate for this [`LastCacheKey`]'s column was provided, and will panic
/// otherwise.
fn evaluate_predicate<'a: 'b, 'b>(
&'a self,
predicate: &'b Predicate,
) -> Vec<(&'a LastCacheState, &'b KeyValue)> {
if predicate.column_id != self.column_id {
panic!(
"attempted to evaluate unexpected predicate with key {} for column with id {}",
predicate.column_id, self.column_id
);
}
match &predicate.kind {
PredicateKind::Eq(val) => self
.value_map
.get(val)
.map(|s| vec![(s, val)])
.unwrap_or_default(),
PredicateKind::NotEq(val) => self
.value_map
.iter()
.filter_map(|(v, s)| (v != val).then_some((s, v)))
.collect(),
PredicateKind::In(vals) => vals
match predicate {
Predicate::In(vals) => vals
.iter()
.filter_map(|v| self.value_map.get(v).map(|s| (s, v)))
.collect(),
PredicateKind::NotIn(vals) => self
Predicate::NotIn(vals) => self
.value_map
.iter()
.filter_map(|(v, s)| (!vals.contains(v)).then_some((s, v)))
Expand All @@ -828,14 +740,25 @@ impl LastCacheKey {
}

/// A value for a key column in a [`LastCache`]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)]
pub(crate) enum KeyValue {
String(String),
Int(i64),
UInt(u64),
Bool(bool),
}

impl std::fmt::Display for KeyValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KeyValue::String(s) => write!(f, "'{s}'"),
KeyValue::Int(i) => write!(f, "{i}i"),
KeyValue::UInt(u) => write!(f, "{u}u"),
KeyValue::Bool(b) => write!(f, "{b}"),
}
}
}

#[cfg(test)]
impl KeyValue {
pub(crate) fn string(s: impl Into<String>) -> Self {
Expand Down
Loading

0 comments on commit 2588235

Please sign in to comment.