From e476181eda3f912be68d3d3552cc40537d200949 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 30 Jan 2024 09:50:35 -0800 Subject: [PATCH] add grouping and handle split leaves --- .../queryplanner/ShardKeyRegexPlanner.scala | 78 ++++++++++++------- .../filodb/query/exec/StitchRvsExec.scala | 4 +- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala index 14040f97fa..7a7897917d 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala @@ -117,7 +117,11 @@ class ShardKeyRegexPlanner(val dataset: Dataset, val result = walkLogicalPlanTree(logicalPlan, qContext) if (result.plans.size > 1) { MultiPartitionDistConcatExec(qContext, inProcessPlanDispatcher, result.plans) - } else result.plans.head + } else if (result.plans.size == 1) { + result.plans.head + } else { + EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher) + } } } @@ -148,21 +152,6 @@ class ShardKeyRegexPlanner(val dataset: Dataset, override def walkLogicalPlanTree(logicalPlan: LogicalPlan, qContext: QueryContext, forceInProcess: Boolean = false): PlanResult = { - // Materialize with the inner planner if: - // - all plan data lies on one partition - // - all RawSeries filters are identical - val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] - val shardKeyFilterGroups = LogicalPlan.getRawSeriesFilters(logicalPlan) - .map(_.filter(cf => dataset.options.nonMetricShardColumns.contains(cf.column))) - val headFilters = shardKeyFilterGroups.headOption.map(_.toSet) - // Note: unchecked .get is OK here since it will only be called for each tail element. - val hasSameShardKeyFilters = shardKeyFilterGroups.tail.forall(_.toSet == headFilters.get) - val partitions = getShardKeys(logicalPlan) - .flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams)) - if (hasSameShardKeyFilters && isSinglePartition(partitions)) { - val plans = generateExec(logicalPlan, getShardKeys(logicalPlan), qContext) - return PlanResult(plans) - } logicalPlan match { case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(qContext, lp) case lp: ApplyInstantFunction => materializeApplyInstantFunction(qContext, lp) @@ -241,16 +230,21 @@ class ShardKeyRegexPlanner(val dataset: Dataset, // maps individual partitions to the set of shard-keys they contain. val partitionsToKeys = new mutable.HashMap[String, mutable.Buffer[Seq[ColumnFilter]]]() + val partitionSplitKeys = new mutable.ArrayBuffer[Seq[ColumnFilter]] keys.foreach { key => val newLogicalPlan = logicalPlan.replaceFilters(key) // Querycontext should just have the part of query which has regex // For example for exp(sum(test{_ws_ = "demo", _ns_ =~ "App.*"})), sub queries should be // sum(test{_ws_ = "demo", _ns_ = "App-1"}), sum(test{_ws_ = "demo", _ns_ = "App-2"}) etc val newQueryParams = updateQueryParams(newLogicalPlan, queryParams) - getPartitions(newLogicalPlan, newQueryParams) + val partitions = getPartitions(newLogicalPlan, newQueryParams) .map(_.partitionName) .distinct - .foreach(part => partitionsToKeys.getOrElseUpdate(part, new mutable.ArrayBuffer).append(key)) + if (partitions.size > 1) { + partitionSplitKeys.append(key) + } else { + partitions.foreach(part => partitionsToKeys.getOrElseUpdate(part, new mutable.ArrayBuffer).append(key)) + } } // Sort each key into the same order as nonMetricShardKeys, then group keys with the same prefix. @@ -261,6 +255,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset, .map(key => key.sortBy(filter => nonMetricShardKeyColToIndex.getOrElse(filter.column, 0))) .groupBy(_.dropRight(1)) .values + .flatMap(group => group.grouped(10)) // TODO(a_theimer) (partition, prefixGroups) } @@ -269,7 +264,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset, partitionToKeyGroups.values.headOption.map(_.size).getOrElse(0) > 1 // Create one plan per key group. - partitionToKeyGroups.flatMap{ case (partition, keyGroups) => + val groupedPlans = partitionToKeyGroups.flatMap{ case (partition, keyGroups) => // NOTE: partition is intentionally unused; the inner planner will again determine which partitions own the data. keyGroups.map{ keys => // Create a map of key->values, then create a ColumnFilter for each key. @@ -296,6 +291,15 @@ class ShardKeyRegexPlanner(val dataset: Dataset, queryPlanner.materialize(newLogicalPlan, newQueryContext) } }.toSeq + val splitPlans = partitionSplitKeys.map{ key => + // Update the LogicalPlan with the new partition-specific filters, then materialize. + val newLogicalPlan = logicalPlan.replaceFilters(key) + val newQueryParams = updateQueryParams(newLogicalPlan, queryParams) + val newQueryContext = qContext.copy(origQueryParams = newQueryParams, plannerParams = qContext.plannerParams. + copy(skipAggregatePresent = skipAggregatePresentValue)) + queryPlanner.materialize(newLogicalPlan, newQueryContext) + } + groupedPlans ++ splitPlans } // scalastyle:on method.length @@ -438,14 +442,27 @@ class ShardKeyRegexPlanner(val dataset: Dataset, * Sub query could be across multiple partitions so concatenate using MultiPartitionDistConcatExec * */ private def materializeOthers(logicalPlan: LogicalPlan, queryContext: QueryContext): PlanResult = { + // Materialize with the inner planner if: + // - all plan data lies on one partition + // - all RawSeries filters are identical + val qParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] + val shardKeyFilterGroups = LogicalPlan.getRawSeriesFilters(logicalPlan) + .map(_.filter(cf => dataset.options.nonMetricShardColumns.contains(cf.column))) + val headFilters = shardKeyFilterGroups.headOption.map(_.toSet) + // Note: unchecked .get is OK here since it will only be called for each tail element. + val hasSameShardKeyFilters = shardKeyFilterGroups.tail.forall(_.toSet == headFilters.get) + val partitions = getShardKeys(logicalPlan) + .flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams)) + if (partitions.nonEmpty && hasSameShardKeyFilters && isSinglePartition(partitions)) { + val plans = generateExec(logicalPlan, getShardKeys(logicalPlan), queryContext) + return PlanResult(plans) + } val nonMetricShardKeyFilters = LogicalPlan.getNonMetricShardKeyFilters(logicalPlan, dataset.options.nonMetricShardColumns) - // For queries which don't have RawSeries filters like metadata and fixed scalar queries - val exec = if (nonMetricShardKeyFilters.head.isEmpty) queryPlanner.materialize(logicalPlan, queryContext) - else { - val execPlans = generateExecWithoutRegex(logicalPlan, nonMetricShardKeyFilters.head, queryContext) - if (execPlans.size == 1) - execPlans.head + val execPlans = generateExecWithoutRegex(logicalPlan, nonMetricShardKeyFilters.head, queryContext) + val exec = if (execPlans.size == 1) { + execPlans.head + } else if (execPlans.size > 1) { // TODO // here we essentially do not allow to optimize the physical plan for subqueries // as we concat the results with MultiPartitionDisConcatExec, @@ -453,11 +470,12 @@ class ShardKeyRegexPlanner(val dataset: Dataset, // max_over_time(rate(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[10m])[1h:1m]) // sum_over_time(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[5d:300s]) // would have suboptimal performance. See subquery tests in PlannerHierarchySpec - else - MultiPartitionDistConcatExec( - queryContext, inProcessPlanDispatcher, - execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]) - ) + MultiPartitionDistConcatExec( + queryContext, inProcessPlanDispatcher, + execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]) + ) + } else { + EmptyResultExec(queryContext, dataset.ref, inProcessPlanDispatcher) } PlanResult(Seq(exec)) } diff --git a/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala b/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala index 88e960d42d..e85bf0c0ae 100644 --- a/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala +++ b/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala @@ -81,9 +81,9 @@ object StitchRvsExec { // The second condition checks if these values are equal within the tolerable limits and if yes, do not // emit NaN. // TODO: Make the second check and tolerance configurable? - if (minsWithoutNan.tail.isEmpty) { + if (minsWithoutNan.size == 1) { minsWithoutNan.head - } else if (enableApproximatelyEqualCheck && + } else if (minsWithoutNan.size > 1 && enableApproximatelyEqualCheck && minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1) { minsWithoutNan.head } else {