Skip to content

Commit

Permalink
add grouping and handle split leaves
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 30, 2024
1 parent 503e508 commit e476181
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
val result = walkLogicalPlanTree(logicalPlan, qContext)
if (result.plans.size > 1) {
MultiPartitionDistConcatExec(qContext, inProcessPlanDispatcher, result.plans)
} else result.plans.head
} else if (result.plans.size == 1) {
result.plans.head
} else {
EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher)
}
}
}

Expand Down Expand Up @@ -148,21 +152,6 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
override def walkLogicalPlanTree(logicalPlan: LogicalPlan,
qContext: QueryContext,
forceInProcess: Boolean = false): PlanResult = {
// Materialize with the inner planner if:
// - all plan data lies on one partition
// - all RawSeries filters are identical
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val shardKeyFilterGroups = LogicalPlan.getRawSeriesFilters(logicalPlan)
.map(_.filter(cf => dataset.options.nonMetricShardColumns.contains(cf.column)))
val headFilters = shardKeyFilterGroups.headOption.map(_.toSet)
// Note: unchecked .get is OK here since it will only be called for each tail element.
val hasSameShardKeyFilters = shardKeyFilterGroups.tail.forall(_.toSet == headFilters.get)
val partitions = getShardKeys(logicalPlan)
.flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams))
if (hasSameShardKeyFilters && isSinglePartition(partitions)) {
val plans = generateExec(logicalPlan, getShardKeys(logicalPlan), qContext)
return PlanResult(plans)
}
logicalPlan match {
case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(qContext, lp)
case lp: ApplyInstantFunction => materializeApplyInstantFunction(qContext, lp)
Expand Down Expand Up @@ -241,16 +230,21 @@ class ShardKeyRegexPlanner(val dataset: Dataset,

// maps individual partitions to the set of shard-keys they contain.
val partitionsToKeys = new mutable.HashMap[String, mutable.Buffer[Seq[ColumnFilter]]]()
val partitionSplitKeys = new mutable.ArrayBuffer[Seq[ColumnFilter]]
keys.foreach { key =>
val newLogicalPlan = logicalPlan.replaceFilters(key)
// Querycontext should just have the part of query which has regex
// For example for exp(sum(test{_ws_ = "demo", _ns_ =~ "App.*"})), sub queries should be
// sum(test{_ws_ = "demo", _ns_ = "App-1"}), sum(test{_ws_ = "demo", _ns_ = "App-2"}) etc
val newQueryParams = updateQueryParams(newLogicalPlan, queryParams)
getPartitions(newLogicalPlan, newQueryParams)
val partitions = getPartitions(newLogicalPlan, newQueryParams)
.map(_.partitionName)
.distinct
.foreach(part => partitionsToKeys.getOrElseUpdate(part, new mutable.ArrayBuffer).append(key))
if (partitions.size > 1) {
partitionSplitKeys.append(key)
} else {
partitions.foreach(part => partitionsToKeys.getOrElseUpdate(part, new mutable.ArrayBuffer).append(key))
}
}

// Sort each key into the same order as nonMetricShardKeys, then group keys with the same prefix.
Expand All @@ -261,6 +255,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
.map(key => key.sortBy(filter => nonMetricShardKeyColToIndex.getOrElse(filter.column, 0)))
.groupBy(_.dropRight(1))
.values
.flatMap(group => group.grouped(10)) // TODO(a_theimer)
(partition, prefixGroups)
}

Expand All @@ -269,7 +264,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
partitionToKeyGroups.values.headOption.map(_.size).getOrElse(0) > 1

// Create one plan per key group.
partitionToKeyGroups.flatMap{ case (partition, keyGroups) =>
val groupedPlans = partitionToKeyGroups.flatMap{ case (partition, keyGroups) =>
// NOTE: partition is intentionally unused; the inner planner will again determine which partitions own the data.
keyGroups.map{ keys =>
// Create a map of key->values, then create a ColumnFilter for each key.
Expand All @@ -296,6 +291,15 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
queryPlanner.materialize(newLogicalPlan, newQueryContext)
}
}.toSeq
val splitPlans = partitionSplitKeys.map{ key =>
// Update the LogicalPlan with the new partition-specific filters, then materialize.
val newLogicalPlan = logicalPlan.replaceFilters(key)
val newQueryParams = updateQueryParams(newLogicalPlan, queryParams)
val newQueryContext = qContext.copy(origQueryParams = newQueryParams, plannerParams = qContext.plannerParams.
copy(skipAggregatePresent = skipAggregatePresentValue))
queryPlanner.materialize(newLogicalPlan, newQueryContext)
}
groupedPlans ++ splitPlans
}
// scalastyle:on method.length

Expand Down Expand Up @@ -438,26 +442,40 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
* Sub query could be across multiple partitions so concatenate using MultiPartitionDistConcatExec
* */
private def materializeOthers(logicalPlan: LogicalPlan, queryContext: QueryContext): PlanResult = {
// Materialize with the inner planner if:
// - all plan data lies on one partition
// - all RawSeries filters are identical
val qParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val shardKeyFilterGroups = LogicalPlan.getRawSeriesFilters(logicalPlan)
.map(_.filter(cf => dataset.options.nonMetricShardColumns.contains(cf.column)))
val headFilters = shardKeyFilterGroups.headOption.map(_.toSet)
// Note: unchecked .get is OK here since it will only be called for each tail element.
val hasSameShardKeyFilters = shardKeyFilterGroups.tail.forall(_.toSet == headFilters.get)
val partitions = getShardKeys(logicalPlan)
.flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams))
if (partitions.nonEmpty && hasSameShardKeyFilters && isSinglePartition(partitions)) {
val plans = generateExec(logicalPlan, getShardKeys(logicalPlan), queryContext)
return PlanResult(plans)
}
val nonMetricShardKeyFilters =
LogicalPlan.getNonMetricShardKeyFilters(logicalPlan, dataset.options.nonMetricShardColumns)
// For queries which don't have RawSeries filters like metadata and fixed scalar queries
val exec = if (nonMetricShardKeyFilters.head.isEmpty) queryPlanner.materialize(logicalPlan, queryContext)
else {
val execPlans = generateExecWithoutRegex(logicalPlan, nonMetricShardKeyFilters.head, queryContext)
if (execPlans.size == 1)
execPlans.head
val execPlans = generateExecWithoutRegex(logicalPlan, nonMetricShardKeyFilters.head, queryContext)
val exec = if (execPlans.size == 1) {
execPlans.head
} else if (execPlans.size > 1) {
// TODO
// here we essentially do not allow to optimize the physical plan for subqueries
// as we concat the results with MultiPartitionDisConcatExec,
// below queries:
// max_over_time(rate(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[10m])[1h:1m])
// sum_over_time(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[5d:300s])
// would have suboptimal performance. See subquery tests in PlannerHierarchySpec
else
MultiPartitionDistConcatExec(
queryContext, inProcessPlanDispatcher,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec])
)
MultiPartitionDistConcatExec(
queryContext, inProcessPlanDispatcher,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec])
)
} else {
EmptyResultExec(queryContext, dataset.ref, inProcessPlanDispatcher)
}
PlanResult(Seq(exec))
}
Expand Down
4 changes: 2 additions & 2 deletions query/src/main/scala/filodb/query/exec/StitchRvsExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ object StitchRvsExec {
// The second condition checks if these values are equal within the tolerable limits and if yes, do not
// emit NaN.
// TODO: Make the second check and tolerance configurable?
if (minsWithoutNan.tail.isEmpty) {
if (minsWithoutNan.size == 1) {
minsWithoutNan.head
} else if (enableApproximatelyEqualCheck &&
} else if (minsWithoutNan.size > 1 && enableApproximatelyEqualCheck &&
minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1) {
minsWithoutNan.head
} else {
Expand Down

0 comments on commit e476181

Please sign in to comment.