Skip to content

Commit

Permalink
perf(query): reduce regex query fanout (#1704)
Browse files Browse the repository at this point in the history
Groups sets of shard-keys together into individual plans before sending them remotely or executing locally. Keys are combined into individual plans with pipe-concatenated regex filters (filter1=~"foo|bar|baz"). This will reduce the total count of executed plans; the total per-query overhead is reduced.
  • Loading branch information
alextheimer authored Feb 9, 2024
1 parent 9659b2a commit 8f7933f
Show file tree
Hide file tree
Showing 18 changed files with 2,560 additions and 1,998 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
// Routes are created according to offset but logical plan should have time without offset.
// Offset logic is handled in ExecPlan
localPlanner.materialize(
copyLogicalPlanWithUpdatedTimeRange(rootLogicalPlan, TimeRange(timeRange.startMs + offsetMs.max,
timeRange.endMs + offsetMs.min)), qContext)
copyLogicalPlanWithUpdatedSeconds(rootLogicalPlan,
(timeRange.startMs + offsetMs.max) / 1000,
(timeRange.endMs + offsetMs.min) / 1000),
qContext)
}
case route: RemoteRoute =>
val timeRange = route.timeRange.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.core.query.{ColumnFilter, QueryContext, QueryUtils, RangeParams}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand All @@ -28,17 +29,17 @@ object LogicalPlanUtils extends StrictLogging {
}

/**
* Given a LogicalPlan check if any descendent of plan is an aggregate operation
* Given a LogicalPlan check if any descendent of plan is either an aggregate or join operation
* @param lp the LogicalPlan instance
* @return true if a descendent is an aggregate else false
* @return true if a descendent is an aggregate or join else false
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
def hasDescendantAggregateOrJoin(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregateOrJoin(_))
case _ => false
}
/**
Expand Down Expand Up @@ -87,11 +88,22 @@ object LogicalPlanUtils extends StrictLogging {
// scalastyle:on cyclomatic.complexity

/**
* Used to change start and end time(TimeRange) of LogicalPlan
* Used to change start and end time (with 'second' precision) of LogicalPlan
* NOTE: Plan should be PeriodicSeriesPlan
*/
def copyLogicalPlanWithUpdatedTimeRange(logicalPlan: LogicalPlan,
timeRange: TimeRange): LogicalPlan = {
def copyLogicalPlanWithUpdatedSeconds(logicalPlan: LogicalPlan,
startSec: Long,
endSec: Long): LogicalPlan = {
// Snap the range endpoints to the previous second.
// Plans sometimes require (correctly) that their children share the same start/stop times,
// but some plans do not support millisecond precision. A recursive time-range update might
// cause two child plan time-ranges to mismatch if the range is not second-aligned.
// Given that millisecond precision is never necessary, this method accepts timestamps in
// 'seconds' to avoid the hassle.
val timeRange = TimeRange(
1000 * startSec,
1000 * endSec
)
logicalPlan match {
case lp: PeriodicSeriesPlan => copyWithUpdatedTimeRange(lp, timeRange)
case lp: RawSeriesLikePlan => copyNonPeriodicWithUpdatedTimeRange(lp, timeRange)
Expand Down Expand Up @@ -411,18 +423,32 @@ object LogicalPlanUtils extends StrictLogging {
* - all plan RawSeries share the same target-schema columns.
* - no target-schema definition changes during the query.
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
// the options contain a target-schema iff it is defined and unchanging.
val rsTschemaOpts = LogicalPlan.findLeafLogicalPlans(plan)
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val shardKeyFilters = getShardKeyFilters(rs)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
shardKeyFilters.map{ shardKey =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val rawShardKeyFilters = getShardKeyFilters(rs)
// The filters might contain pipe-concatenated EqualsRegex values.
// Convert these into sets of single-valued Equals filters.
val resolvedShardKeyFilters = rawShardKeyFilters.flatMap { filters =>
val equalsFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}
}
// E.g. foo{key1=~"baz|bat",key2=~"bar|bak"} would give the following combos:
// [[baz,bar], [baz,bak], [bat,bar], [bat,bak]]
QueryUtils.combinations(equalsFilters)
}.map(_.toSet).distinct.map(_.toSeq) // make sure keys are distinct
resolvedShardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ import filodb.query.exec._
// ScalarFixedDoublePlan accept times in seconds. Thus cases like sum(rate(foo{}[longtime])) or vector(0)
// this mismatch in end time for LHS and RHS causes the Binary join object creation to fail and even plan
// materialize to fail.
copyLogicalPlanWithUpdatedTimeRange(periodicSeriesPlan,
TimeRange(periodicSeriesPlan.startMs, (latestDownsampleTimestampFn + offsetMillis.min) / 1000 * 1000))
copyLogicalPlanWithUpdatedSeconds(periodicSeriesPlan,
periodicSeriesPlan.startMs / 1000,
(latestDownsampleTimestampFn + offsetMillis.min) / 1000)
}
logger.debug("materializing against downsample cluster:: {}", qContext.origQueryParams)
downsampleClusterPlanner.materialize(downsampleLp, qContext)
Expand All @@ -155,13 +156,14 @@ import filodb.query.exec._
val lastDownsampleInstant = periodicSeriesPlan.startMs + numStepsInDownsample * periodicSeriesPlan.stepMs
val firstInstantInRaw = lastDownsampleInstant + periodicSeriesPlan.stepMs

val downsampleLp = copyLogicalPlanWithUpdatedTimeRange(periodicSeriesPlan,
TimeRange(periodicSeriesPlan.startMs, lastDownsampleInstant))
val downsampleLp = copyLogicalPlanWithUpdatedSeconds(periodicSeriesPlan,
periodicSeriesPlan.startMs / 1000, lastDownsampleInstant / 1000)
val downsampleEp = downsampleClusterPlanner.materialize(downsampleLp, qContext)
logger.debug("materializing against downsample cluster:: {}", qContext.origQueryParams)

val rawLp = copyLogicalPlanWithUpdatedTimeRange(periodicSeriesPlan, TimeRange(firstInstantInRaw,
periodicSeriesPlan.endMs))
val rawLp = copyLogicalPlanWithUpdatedSeconds(periodicSeriesPlan,
firstInstantInRaw / 1000, periodicSeriesPlan.endMs / 1000)

val rawEp = rawClusterPlanner.materialize(rawLp, qContext)
StitchRvsExec(qContext, stitchDispatcher, rvRangeFromPlan(periodicSeriesPlan),
Seq(rawEp, downsampleEp))
Expand Down
Loading

0 comments on commit 8f7933f

Please sign in to comment.