Skip to content

Commit

Permalink
recover/merge old commits
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 26, 2024
1 parent 56687c8 commit 503e508
Show file tree
Hide file tree
Showing 12 changed files with 1,725 additions and 1,995 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package filodb.coordinator.queryplanner

import scala.collection.{mutable, Seq}
import scala.collection.{Seq, mutable}
import scala.collection.mutable.ArrayBuffer

import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, Filter, QueryContext, RangeParams}
import filodb.core.query.{ColumnFilter, QueryContext, QueryUtils, RangeParams}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand Down Expand Up @@ -406,25 +405,13 @@ object LogicalPlanUtils extends StrictLogging {
columnMap.values.toSeq
}

/**
* Resolves non-Equals shard-key filters into sets of Equals filters.
* The argument shardKeyMatcher function is called only if a non-Equals filter exists.
*/
def resolveShardKeyFilters(shardKeyFilters: Seq[ColumnFilter],
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]])
: Seq[Seq[ColumnFilter]] = {
val hasNonEqualShardKeyFilter = shardKeyFilters.exists(!_.filter.isInstanceOf[Filter.Equals])
if (hasNonEqualShardKeyFilter) shardKeyMatcher(shardKeyFilters) else Seq(shardKeyFilters)
}

/**
* Returns a set of target-schema columns iff all of:
* - all plan RawSeries share the same target-schema columns.
* - no target-schema definition changes during the query.
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
Expand All @@ -433,15 +420,22 @@ object LogicalPlanUtils extends StrictLogging {
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val shardKeyFilters = getShardKeyFilters(rs)
.flatMap(LogicalPlanUtils.resolveShardKeyFilters(_, shardKeyMatcher))
shardKeyFilters.map{ shardKey =>
// replace any possibly-regex shard-key filters with equals-only filters, then
// use all filters to determine whether-or-not an unchanging target-schema is defined.
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
val rawShardKeyFilters = getShardKeyFilters(rs)
val shardKeyFilters = rawShardKeyFilters.flatMap{ filters =>
val resolvedFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
// Take care of pipe-joined values here -- create one Equals filter per value.
case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}}
QueryUtils.combinations(resolvedFilters)
}
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
}
if (rsTschemaOpts.isEmpty) {
return None
}
Expand All @@ -461,7 +455,6 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def canPushdown(plan: CandidatePushdownPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Boolean = {
val hasPushdownableClause = plan match {
Expand All @@ -488,7 +481,7 @@ object LogicalPlanUtils extends StrictLogging {
// return true
// }

val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, shardKeyMatcher, getShardKeyFilters)
val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, getShardKeyFilters)
if (tschema.isEmpty) {
return false
}
Expand All @@ -510,7 +503,6 @@ object LogicalPlanUtils extends StrictLogging {
*/
def getPushdownKeys[T](lp: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getRawPushdownKeys: RawSeries => Set[T],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Option[Set[T]] = {
Expand Down Expand Up @@ -542,13 +534,12 @@ object LogicalPlanUtils extends StrictLogging {
lhsKeys.isDefined && rhsKeys.isDefined &&
// either the lhs/rhs keys are equal, or at least one of lhs/rhs includes only scalars.
(lhsKeys.get.isEmpty || rhsKeys.get.isEmpty || lhsKeys == rhsKeys) &&
canPushdown(bj, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
canPushdown(bj, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
// union lhs/rhs keys, since one might be empty (if it's a scalar)
if (canPushdownBj) Some(lhsKeys.get.union(rhsKeys.get)) else None
case agg: Aggregate =>
val keys = helper(agg.vectors)
val canPushdownAgg =
canPushdown(agg, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
val canPushdownAgg = canPushdown(agg, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
if (keys.isDefined && canPushdownAgg) keys else None
case nl: NonLeafLogicalPlan =>
// return the set of all child keys iff all child plans can be pushdown-optimized
Expand Down
Loading

0 comments on commit 503e508

Please sign in to comment.