Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): reduce regex query fanout #1644

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.core.query.{ColumnFilter, Filter, QueryContext, RangeParams}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand All @@ -28,17 +28,17 @@ object LogicalPlanUtils extends StrictLogging {
}

/**
* Given a LogicalPlan check if any descendent of plan is an aggregate operation
* Given a LogicalPlan check if any descendent of plan is either an aggregate or join operation
* @param lp the LogicalPlan instance
* @return true if a descendent is an aggregate else false
* @return true if a descendent is an aggregate or join else false
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
def hasDescendantAggregateOrJoin(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregateOrJoin(_))
case _ => false
}
/**
Expand Down Expand Up @@ -406,27 +406,42 @@ object LogicalPlanUtils extends StrictLogging {
columnMap.values.toSeq
}

/**
* Resolves non-Equals shard-key filters into sets of Equals filters.
* The argument shardKeyMatcher function is called only if a non-Equals filter exists.
*/
def resolveShardKeyFiltersIfNotEquals(shardKeyFilters: Seq[ColumnFilter],
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]])
: Seq[Seq[ColumnFilter]] = {
val hasNonEqualShardKeyFilter = shardKeyFilters.exists(!_.filter.isInstanceOf[Filter.Equals])
if (hasNonEqualShardKeyFilter) shardKeyMatcher(shardKeyFilters) else Seq(shardKeyFilters)
}

/**
* Returns a set of target-schema columns iff all of:
* - all plan RawSeries share the same target-schema columns.
* - no target-schema definition changes during the query.
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
// the options contain a target-schema iff it is defined and unchanging.
val rsTschemaOpts = LogicalPlan.findLeafLogicalPlans(plan)
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val shardKeyFilters = getShardKeyFilters(rs)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val shardKeyFilters = getShardKeyFilters(rs)
.flatMap(LogicalPlanUtils.resolveShardKeyFiltersIfNotEquals(_, shardKeyMatcher))
shardKeyFilters.map{ shardKey =>
// replace any possibly-regex shard-key filters with equals-only filters, then
// use all filters to determine whether-or-not an unchanging target-schema is defined.
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
}
}
if (rsTschemaOpts.isEmpty) {
return None
}
Expand All @@ -446,6 +461,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def canPushdown(plan: CandidatePushdownPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Boolean = {
val hasPushdownableClause = plan match {
Expand All @@ -471,7 +487,7 @@ object LogicalPlanUtils extends StrictLogging {
// return true
// }

val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, getShardKeyFilters)
val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, shardKeyMatcher, getShardKeyFilters)
if (tschema.isEmpty) {
return false
}
Expand All @@ -493,6 +509,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
def getPushdownKeys[T](lp: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getRawPushdownKeys: RawSeries => Set[T],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Option[Set[T]] = {
Expand Down Expand Up @@ -524,12 +541,13 @@ object LogicalPlanUtils extends StrictLogging {
lhsKeys.isDefined && rhsKeys.isDefined &&
// either the lhs/rhs keys are equal, or at least one of lhs/rhs includes only scalars.
(lhsKeys.get.isEmpty || rhsKeys.get.isEmpty || lhsKeys == rhsKeys) &&
canPushdown(bj, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
canPushdown(bj, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
// union lhs/rhs keys, since one might be empty (if it's a scalar)
if (canPushdownBj) Some(lhsKeys.get.union(rhsKeys.get)) else None
case agg: Aggregate =>
val keys = helper(agg.vectors)
val canPushdownAgg = canPushdown(agg, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
val canPushdownAgg =
canPushdown(agg, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
if (keys.isDefined && canPushdownAgg) keys else None
case nl: NonLeafLogicalPlan =>
// return the set of all child keys iff all child plans can be pushdown-optimized
Expand Down
Loading
Loading