Skip to content

Commit

Permalink
perf(query): reduce query fanout
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Sep 29, 2023
1 parent 6cb5433 commit 60921cb
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.core.query.{ColumnFilter, Filter, QueryContext, RangeParams}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand All @@ -28,17 +28,17 @@ object LogicalPlanUtils extends StrictLogging {
}

/**
* Given a LogicalPlan check if any descendent of plan is an aggregate operation
* Given a LogicalPlan check if any descendent of plan is either an aggregate or join operation
* @param lp the LogicalPlan instance
* @return true if a descendent is an aggregate else false
* @return true if a descendent is an aggregate or join else false
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
def hasDescendantAggregateOrJoin(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregateOrJoin(_))
case _ => false
}
/**
Expand Down Expand Up @@ -406,27 +406,42 @@ object LogicalPlanUtils extends StrictLogging {
columnMap.values.toSeq
}

/**
* Resolves non-Equals shard-key filters into sets of Equals filters.
* The argument shardKeyMatcher function is called only if a non-Equals filter exists.
*/
def resolveShardKeyFiltersIfNotEquals(shardKeyFilters: Seq[ColumnFilter],
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]])
: Seq[Seq[ColumnFilter]] = {
val hasNonEqualShardKeyFilter = shardKeyFilters.exists(!_.filter.isInstanceOf[Filter.Equals])
if (hasNonEqualShardKeyFilter) shardKeyMatcher(shardKeyFilters) else Seq(shardKeyFilters)
}

/**
* Returns a set of target-schema columns iff all of:
* - all plan RawSeries share the same target-schema columns.
* - no target-schema definition changes during the query.
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
// the options contain a target-schema iff it is defined and unchanging.
val rsTschemaOpts = LogicalPlan.findLeafLogicalPlans(plan)
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val shardKeyFilters = getShardKeyFilters(rs)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val shardKeyFilters = getShardKeyFilters(rs)
.flatMap(LogicalPlanUtils.resolveShardKeyFiltersIfNotEquals(_, shardKeyMatcher))
shardKeyFilters.map{ shardKey =>
// replace any possibly-regex shard-key filters with equals-only filters, then
// use all filters to determine whether-or-not an unchanging target-schema is defined.
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
}
}
if (rsTschemaOpts.isEmpty) {
return None
}
Expand All @@ -446,6 +461,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def canPushdown(plan: CandidatePushdownPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Boolean = {
val hasPushdownableClause = plan match {
Expand All @@ -471,7 +487,7 @@ object LogicalPlanUtils extends StrictLogging {
// return true
// }

val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, getShardKeyFilters)
val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, shardKeyMatcher, getShardKeyFilters)
if (tschema.isEmpty) {
return false
}
Expand All @@ -493,6 +509,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
def getPushdownKeys[T](lp: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getRawPushdownKeys: RawSeries => Set[T],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Option[Set[T]] = {
Expand Down Expand Up @@ -524,12 +541,13 @@ object LogicalPlanUtils extends StrictLogging {
lhsKeys.isDefined && rhsKeys.isDefined &&
// either the lhs/rhs keys are equal, or at least one of lhs/rhs includes only scalars.
(lhsKeys.get.isEmpty || rhsKeys.get.isEmpty || lhsKeys == rhsKeys) &&
canPushdown(bj, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
canPushdown(bj, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
// union lhs/rhs keys, since one might be empty (if it's a scalar)
if (canPushdownBj) Some(lhsKeys.get.union(rhsKeys.get)) else None
case agg: Aggregate =>
val keys = helper(agg.vectors)
val canPushdownAgg = canPushdown(agg, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
val canPushdownAgg =
canPushdown(agg, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
if (keys.isDefined && canPushdownAgg) keys else None
case nl: NonLeafLogicalPlan =>
// return the set of all child keys iff all child plans can be pushdown-optimized
Expand Down
Loading

0 comments on commit 60921cb

Please sign in to comment.