From 1726f74156f19fbb118f2c3d079091413d715121 Mon Sep 17 00:00:00 2001 From: Brian Yu <82572411+Boyuan-Yu@users.noreply.github.com> Date: Thu, 12 Dec 2024 16:15:39 -0800 Subject: [PATCH] feat(query): delete validation for "split" queries contain binary joins with offsets (#1910) Co-authored-by: Brian-Yu --- .../queryplanner/MultiPartitionPlanner.scala | 27 ---- .../MultiPartitionPlannerSpec.scala | 14 +-- .../queryplanner/PlannerHierarchySpec.scala | 118 +++++++++++------- 3 files changed, 71 insertions(+), 88 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94a56d8269..e57b112663 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -567,30 +567,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv }} } - /** - * Throws a BadQueryException if any of the following conditions hold: - * (1) the plan spans more than one non-metric shard key prefix. - * (2) the plan contains at least one BinaryJoin, and any of its BinaryJoins contain an offset. - * @param splitLeafPlan must contain leaf plans that individually span multiple partitions. - */ - private def validateSplitLeafPlan(splitLeafPlan: LogicalPlan): Unit = { - val baseErrorMessage = "This query contains selectors that individually read data from multiple partitions. " + - "This is likely because a selector's data was migrated between partitions. " - if (hasBinaryJoin(splitLeafPlan) && getOffsetMillis(splitLeafPlan).exists(_ > 0)) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries cannot contain binary joins with offsets." - ) - } - lazy val hasMoreThanOneNonMetricShardKey = - LogicalPlanUtils.resolvePipeConcatenatedShardKeyFilters(splitLeafPlan, dataset.options.nonMetricShardColumns) - .filter(_.nonEmpty).distinct.size > 1 - if (hasMoreThanOneNonMetricShardKey) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries are not supported if they contain multiple non-metric shard keys." - ) - } - } - /** * Materializes a LogicalPlan with leaves that individually span multiple partitions. * All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they @@ -601,9 +577,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv //scalastyle:off method.length private def materializeSplitLeafPlan(logicalPlan: LogicalPlan, qContext: QueryContext): PlanResult = { - // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within - // the limits of max range of data exported - validateSplitLeafPlan(logicalPlan) val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 57897657c0..f40712935b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.BinaryOperator.{ADD, LAND} import filodb.query.InstantFunctionId.Ln -import filodb.query.{BadQueryException, LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} +import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} import filodb.query.exec._ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{ @@ -1712,18 +1712,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan2, expectedPlanWithRemoteExec1) - val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))" - val lp4 = Parser.queryRangeToLogicalPlan(query4, TimeStepParams(2000, stepSecs, 10000)) - - val promQlQueryParams4 = PromQlQueryParams(query4, 1000, 100, 10000) - intercept[BadQueryException] { - // Expecting to see Exception when we use BinaryJoin with offsets, technically this too should not be a big deal - // as we need to identify the right window, however this was not supported even before the change and it is ok to - // leave it unaddressed in the first phase as its just Binary joins with offsets - engine.materialize(lp4, QueryContext(origQueryParams = promQlQueryParams4, plannerParams = - PlannerParams(processMultiPartition = true))) - } - // Planner with period of uncertainty should still generate steps that are aligned with start and step, // that is should be snapped correctly diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 13c2089511..f3ddc5a6e1 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -15,9 +15,11 @@ import filodb.core.query.Filter.{Equals, EqualsRegex} import filodb.prometheus.ast.{TimeStepParams, WindowConstants} import filodb.prometheus.parse.Parser import filodb.prometheus.parse.Parser.Antlr -import filodb.query.{BadQueryException, IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} +import filodb.query.{IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} import filodb.query.exec._ +import java.util.concurrent.TimeUnit + // scalastyle:off line.size.limit // scalastyle:off number.of.methods class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationSpec { @@ -2766,48 +2768,64 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } - it ("should fail to materialize unsupported split-partition queries with binary joins") { - val startSec = 123 - val stepSec = 456 - val endSec = 789 - val queries = Seq( - // aggregate - """sum(foo{job="app"} + bar{job="app2"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """sum(foo{job="app"} offset 1h + bar{job="app"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // instant - """sgn(foo{job="app"} + bar{job="app2"})""", - """exp(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """ln(foo{job="app"} offset 1h + bar{job="app"})""", - """log2(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // binary join - """foo{job="app"} + bar{job="app2"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app2"})""", - """foo{job="app"} offset 1h + bar{job="app"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h)""", - // scalar vector join - """123 + (foo{job="app"} + bar{job="app2"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """123 + (foo{job="app"} offset 1h + bar{job="app"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // scalar - """scalar(foo{job="app"} + bar{job="app2"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """scalar(foo{job="app"} offset 1h + bar{job="app"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // absent - """absent(foo{job="app"} + bar{job="app2"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """absent(foo{job="app"} offset 1h + bar{job="app"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", + + it ("should materialize split-partition queries with lookback and offset binary joins correctly"){ + val startSec = 0 + val stepSec = 3 + val endSec = 9999 + val splitSec = 5000 + + case class Test(query: String, expected: String = "") + val tests = Seq( + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5001,3,6803)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=5001000, step=3000, end=6803000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),6804,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),0,3,6800,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6801,3,7403)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=6801000, step=3000, end=7403000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1800000)) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),7404,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,6200,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1200000)) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),8004,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ) ) + val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { - val midTime = (timeRange.startMs + timeRange.endMs) / 2 - List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, midTime)), - PartitionAssignment("remote1", "remote1-url", TimeRange(midTime, timeRange.endMs))) + val splitMs = 1000 * splitSec + List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs)), + PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs + 1, timeRange.endMs))) } override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], @@ -2816,15 +2834,19 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } val engine = new MultiPartitionPlanner( partitionLocationProvider, singlePartitionPlanner, "local", - MetricsTestData.timeseriesDataset, queryConfig - ) - for (query <- queries) { - val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec)) - val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec) - assertThrows[BadQueryException] { - engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, - plannerParams = PlannerParams(processMultiPartition = true))) - } + MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy( + supportRemoteRawExport = true, + maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS), + periodOfUncertaintyMs = 3000))) + + for (test <- tests) { + val lp = Parser.queryRangeToLogicalPlan(test.query, TimeStepParams(startSec, stepSec, endSec)) + val promQlQueryParams = PromQlQueryParams(test.query, startSec, stepSec, endSec) + val execPlan = engine.materialize(lp, + QueryContext(origQueryParams = promQlQueryParams, + plannerParams = PlannerParams(processMultiPartition = true)) + ) + validatePlan(execPlan, test.expected) } }