From 3f2f281986e9278e37ccd63cd1a91ebf992b808a Mon Sep 17 00:00:00 2001 From: Amol Nayak Date: Thu, 14 Dec 2023 14:10:48 -0800 Subject: [PATCH] WIP --- .../queryplanner/MultiPartitionPlanner.scala | 16 +- .../MultiPartitionPlannerSpec.scala | 304 +++++++----------- 2 files changed, 131 insertions(+), 189 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 8193fc6c88..d4599b7edb 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -66,7 +66,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider val datasetMetricColumn: String = dataset.options.metricColumn - override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { // Pseudo code for the materialize // @@ -234,7 +233,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider _: PeriodicSeries => materializePlanHandleSplitLeaf(logicalPlan, qContext) case raw: RawSeries => val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] - if(getPartitions(raw, params).tail.nonEmpty) + if(getPartitions(raw, params).tail.nonEmpty + && queryConfig.supportRemoteRawExport) this.walkLogicalPlanTree( raw.copy(supportsRemoteDataCall = true), qContext, @@ -455,10 +455,11 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider // First partition doesn't need its start snapped to a periodic step, so deal with it separately. val headRange = { val partRange = assignments.head.timeRange - if (range.startMs <= partRange.startMs) + if (range.startMs <= partRange.endMs) { + // At least, some part of the query ends up in this first partition Some(TimeRange(math.max(range.startMs, partRange.startMs), math.min(partRange.endMs + offsetMs, range.endMs))) - else + } else None } // Snap remaining range starts to a step (if a step is provided). @@ -558,6 +559,9 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider val execPlans = if (assignmentRanges.isEmpty) { // Assignment ranges empty means we cant run this query fully on one partition and needs // remote raw export Check if the total time of raw export is within the limits, if not return Empty result + // While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each + // partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most + // what that partition contains. val (startTime, endTime) = (qParams.startSecs, qParams.endSecs) val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs if (queryConfig.supportRemoteRawExport && @@ -570,7 +574,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider s"Remote raw export is supported and the $totalExpectedRawExport ms" + s" is greater than the max allowed raw export duration of ${queryConfig.maxRemoteRawExportTimeRange}") } else { - logger.warn("Remote raw export not supported") + logger.warn("Remote raw export not enabled") } Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher)) } @@ -701,8 +705,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider def materializeMetadataQueryPlan(lp: MetadataQueryPlan, qContext: QueryContext): PlanResult = { val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] - - // LabelCardinality is a special case, here the partitions to send this query to is not the authorized partition // but the actual one where data resides, similar to how non metadata plans work, however, getting label cardinality // is a metadata operation and shares common components with other metadata endpoints. diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index db164061a3..51911e424d 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -35,7 +35,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida private val config = ConfigFactory.load("application_test.conf") .getConfig("filodb.query").withFallback(routingConfig) - private val queryConfig = QueryConfig(config).copy(plannerSelector = Some("plannerSelector")) + private val queryConfig = QueryConfig(config) + .copy(plannerSelector = Some("plannerSelector"), supportRemoteRawExport = true) val localPlanner = new SingleClusterPlanner(dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0, queryConfig, "raw", StaticSpreadProvider(SpreadChange(0, 1))) @@ -99,35 +100,16 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) + // Notice how the cap between 2999 and 3300 is filled by pulling raw data from both partitions by enabling supportRawExport + val expectedPlanTree = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-T~PeriodicSamplesMapper(start=2999000, step=100000, end=3299000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[300s],2999,1,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[299s],3299,1,3299,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},3300,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds))""".stripMargin - val stitchRvsExec = execPlan.asInstanceOf[StitchRvsExec] - stitchRvsExec.children.size shouldEqual (2) - stitchRvsExec.children(0).isInstanceOf[PromQlRemoteExec] shouldEqual true - stitchRvsExec.children(1).isInstanceOf[PromQlRemoteExec] shouldEqual true - - val remoteExec1 = stitchRvsExec.children(0).asInstanceOf[PromQlRemoteExec] - val queryParams1 = remoteExec1.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams1.startSecs shouldEqual startSeconds - queryParams1.endSecs shouldEqual (localPartitionStart - 1) - queryParams1.stepSecs shouldEqual step - remoteExec1.queryContext.plannerParams.processFailure shouldEqual true - remoteExec1.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec1.queryEndpoint shouldEqual "remote-url" - - // expectedStarMs ends up to be 3 400 000, which does not look right to me, it is supposed to be 3 000 000 - // kpetrov, 12/02/21 - val expectedStartMs = ((startSeconds*1000) to (endSeconds*1000) by (step*1000)).find { instant => - instant - lookbackMs > (localPartitionStart * 1000) - }.get - - val remoteExec2 = stitchRvsExec.children(1).asInstanceOf[PromQlRemoteExec] - val queryParams2 = remoteExec2.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams2.startSecs shouldEqual (expectedStartMs / 1000) - queryParams2.endSecs shouldEqual endSeconds - queryParams2.stepSecs shouldEqual step - remoteExec2.queryContext.plannerParams.processFailure shouldEqual true - remoteExec2.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec2.queryEndpoint shouldEqual "remote-url2" + validatePlan(execPlan, expectedPlanTree) } @@ -857,45 +839,22 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val promQlQueryParams = PromQlQueryParams("test{job = \"app\"}", startSeconds, step, endSeconds) val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) - val stitchRvsExec = execPlan.asInstanceOf[StitchRvsExec] - stitchRvsExec.children.size shouldEqual (3) - stitchRvsExec.children(0).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - stitchRvsExec.children(1).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - stitchRvsExec.children(2).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - val remoteExec1 = stitchRvsExec.children(0).asInstanceOf[PromQlRemoteExec] - val queryParams1 = remoteExec1.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams1.startSecs shouldEqual startSeconds - queryParams1.endSecs shouldEqual 3999 - queryParams1.stepSecs shouldEqual step - remoteExec1.queryContext.plannerParams.processFailure shouldEqual true - remoteExec1.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec1.queryEndpoint shouldEqual "remote-url1" - val remoteExec2 = stitchRvsExec.children(1).asInstanceOf[PromQlRemoteExec] - - val expectedStartMs1 = ((startSeconds*1000) to (endSeconds*1000) by (step*1000)).find { instant => - instant - lookbackMs > (secondPartitionStart * 1000) - }.get - - val expectedStartMs2 = ((startSeconds*1000) to (endSeconds*1000) by (step*1000)).find { instant => - instant - lookbackMs > (thirdPartitionStart * 1000) - }.get - - val queryParams2 = remoteExec2.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams2.startSecs shouldEqual expectedStartMs1 / 1000 - queryParams2.endSecs shouldEqual 6999 - queryParams2.stepSecs shouldEqual step - remoteExec2.queryContext.plannerParams.processFailure shouldEqual true - remoteExec2.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec2.queryEndpoint shouldEqual "remote-url2" - - val remoteExec3 = stitchRvsExec.children(2).asInstanceOf[PromQlRemoteExec] - val queryParams3 = remoteExec3.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams3.startSecs shouldEqual expectedStartMs2 / 1000 - queryParams3.endSecs shouldEqual endSeconds - queryParams3.stepSecs shouldEqual step - remoteExec3.queryContext.plannerParams.processFailure shouldEqual true - remoteExec3.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec3.queryEndpoint shouldEqual "remote-url3" + // Even a three partition span works with stitch filling in the missing gaps + val expectedRawPlan = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,3999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-T~PeriodicSamplesMapper(start=3999000, step=100000, end=4299000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[300s],3999,1,3999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[299s],4299,1,4299,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[-2701s],4299,1,4299,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},4300,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-T~PeriodicSamplesMapper(start=6999000, step=100000, end=7299000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[-2699s],3999,1,3999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[300s],6999,1,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[299s],7299,1,7299,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},7300,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds))""".stripMargin + validatePlan(execPlan, expectedRawPlan) } @@ -928,88 +887,77 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) - val stitchRvsExec = execPlan.asInstanceOf[StitchRvsExec] - stitchRvsExec.children.size shouldEqual (2) - stitchRvsExec.children(0).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - stitchRvsExec.children(1).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - - - // Instant/Raw queries will have same start and end point in all partitions as we want to fetch raw data - val remoteExec1 = stitchRvsExec.children(0).asInstanceOf[PromQlRemoteExec] - val queryParams1 = remoteExec1.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams1.startSecs shouldEqual startSeconds - queryParams1.endSecs shouldEqual endSeconds - queryParams1.stepSecs shouldEqual step - remoteExec1.queryContext.plannerParams.processFailure shouldEqual true - remoteExec1.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec1.queryEndpoint shouldEqual "remote-url1" - - val remoteExec2 = stitchRvsExec.children(1).asInstanceOf[PromQlRemoteExec] - val queryParams2 = remoteExec1.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams2.startSecs shouldEqual startSeconds - queryParams2.endSecs shouldEqual endSeconds - queryParams2.stepSecs shouldEqual step - remoteExec2.queryContext.plannerParams.processFailure shouldEqual true - remoteExec2.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec2.queryEndpoint shouldEqual "remote-url2" + val expectedPlanString = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[49s],949,1,949,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[50s],1000,1,1000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds))""".stripMargin + validatePlan(execPlan, expectedPlanString) } - it ("should generate second Exec with start and end time equal to query end time when query duration is less" + - "than or equal to lookback ") { - - val startSeconds = 1594309980L - val endSeconds = 1594310280L - val localPartitionStartMs: Long = 1594309980001L - val step = 15L - - val partitionLocationProvider = new PartitionLocationProvider { - override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { - if (routingKey.equals(Map("job" -> "app"))) List( - PartitionAssignment("remote1", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, - localPartitionStartMs - 1)), PartitionAssignment("remote2", "remote-url", - TimeRange(localPartitionStartMs, endSeconds * 1000))) - else Nil - } - - override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = List( - PartitionAssignment("remote1", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, - localPartitionStartMs - 1)), PartitionAssignment("remote2", "remote-url", - TimeRange(localPartitionStartMs, endSeconds * 1000))) - - } - val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) - val lp = Parser.queryRangeToLogicalPlan("test{job = \"app\"}", TimeStepParams(startSeconds, step, endSeconds)) - - val promQlQueryParams = PromQlQueryParams("test{job = \"app\"}", startSeconds, step, endSeconds) - - val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = - PlannerParams(processMultiPartition = true))) - val stitchRvsExec = execPlan.asInstanceOf[StitchRvsExec] - stitchRvsExec.children.size shouldEqual (2) - stitchRvsExec.children(0).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - stitchRvsExec.children(1).isInstanceOf[PromQlRemoteExec] shouldEqual (true) - - - val remoteExec = stitchRvsExec.children(0).asInstanceOf[PromQlRemoteExec] - val queryParams = remoteExec.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams.startSecs shouldEqual startSeconds - queryParams.endSecs shouldEqual (localPartitionStartMs - 1) / 1000 - queryParams.stepSecs shouldEqual step - remoteExec.queryContext.plannerParams.processFailure shouldEqual true - remoteExec.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec.queryEndpoint shouldEqual "remote-url" - - val remoteExec2 = stitchRvsExec.children(1).asInstanceOf[PromQlRemoteExec] - val queryParams2 = remoteExec2.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - queryParams2.startSecs shouldEqual endSeconds - queryParams2.endSecs shouldEqual endSeconds - queryParams2.stepSecs shouldEqual step - remoteExec2.queryContext.plannerParams.processFailure shouldEqual true - remoteExec2.queryContext.plannerParams.processMultiPartition shouldEqual false - remoteExec2.queryEndpoint shouldEqual "remote-url" - - } + // TODO: The range query where the end time of a query is less than the lookback (5 mins default) of the partition + // start time of the next partition, the data is not stitched. Calling this out for a future fix and it is not + // a major concern for now, following is the pictorial representation of the scenario + // + // Partition movement-> | | <- lookback from partition assignment ends + // |------------------------------------------|-----------|----------------------------.....| + // ^ ^ + // Query start Query end + +// it ("should generate second Exec with start and end time equal to query end time when query duration is less" + +// "than or equal to lookback ") { +// +// val startSeconds = 1594309980L +// val endSeconds = 1594310280L +// val localPartitionStartMs: Long = 1594309980001L +// val step = 15L +// +// val partitionLocationProvider = new PartitionLocationProvider { +// override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { +// if (routingKey.equals(Map("job" -> "app"))) List( +// PartitionAssignment("remote1", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, +// localPartitionStartMs - 1)), PartitionAssignment("remote2", "remote-url", +// TimeRange(localPartitionStartMs, endSeconds * 1000))) +// else Nil +// } +// +// override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = List( +// PartitionAssignment("remote1", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, +// localPartitionStartMs - 1)), PartitionAssignment("remote2", "remote-url", +// TimeRange(localPartitionStartMs, endSeconds * 1000))) +// +// } +// val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) +// val lp = Parser.queryRangeToLogicalPlan("test{job = \"app\"}", TimeStepParams(startSeconds, step, endSeconds)) +// +// val promQlQueryParams = PromQlQueryParams("test{job = \"app\"}", startSeconds, step, endSeconds) +// +// val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = +// PlannerParams(processMultiPartition = true))) +// val stitchRvsExec = execPlan.asInstanceOf[StitchRvsExec] +// stitchRvsExec.children.size shouldEqual (2) +// stitchRvsExec.children(0).isInstanceOf[PromQlRemoteExec] shouldEqual (true) +// stitchRvsExec.children(1).isInstanceOf[PromQlRemoteExec] shouldEqual (true) +// +// +// val remoteExec = stitchRvsExec.children(0).asInstanceOf[PromQlRemoteExec] +// val queryParams = remoteExec.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] +// queryParams.startSecs shouldEqual startSeconds +// queryParams.endSecs shouldEqual (localPartitionStartMs - 1) / 1000 +// queryParams.stepSecs shouldEqual step +// remoteExec.queryContext.plannerParams.processFailure shouldEqual true +// remoteExec.queryContext.plannerParams.processMultiPartition shouldEqual false +// remoteExec.queryEndpoint shouldEqual "remote-url" +// +// val remoteExec2 = stitchRvsExec.children(1).asInstanceOf[PromQlRemoteExec] +// val queryParams2 = remoteExec2.queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] +// queryParams2.startSecs shouldEqual endSeconds +// queryParams2.endSecs shouldEqual endSeconds +// queryParams2.stepSecs shouldEqual step +// remoteExec2.queryContext.plannerParams.processFailure shouldEqual true +// remoteExec2.queryContext.plannerParams.processMultiPartition shouldEqual false +// remoteExec2.queryEndpoint shouldEqual "remote-url" +// +// } it ("should generate Exec plan for Metadata Label values query") { def partitions(timeRange: TimeRange): List[PartitionAssignment] = @@ -1715,23 +1663,23 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val expectedPlanWithRemoteExport = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7599)) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6999,100,7599)) |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7599000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----T~PeriodicSamplesMapper(start=6999000, step=100000, end=7599000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None) |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw) + |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw) |------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[600s],6999,1,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7600,100,10000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1811204169],raw)""".stripMargin + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#862629852],raw)""".stripMargin val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig.copy(supportRemoteRawExport = true)) @@ -1746,42 +1694,42 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val expectedPlanWithRemoteExec1 = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])) + sum(rate(bar{job = "app"}[5m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |-E~BinaryJoinExec(binaryOp=ADD, on=List(), ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7599)) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6999,100,7599)) |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7599000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-----T~PeriodicSamplesMapper(start=6999000, step=100000, end=7599000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None) |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |-------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[600s],6999,1,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7599)) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6999,100,7599)) |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7599000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-----T~PeriodicSamplesMapper(start=6999000, step=100000, end=7599000, window=Some(300000), functionId=Some(Rate), rawSource=false, offsetMs=None) |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(6999000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7000000,7599000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |-------E~PromQlRemoteExec(PromQlQueryParams(bar{job="app"}[300s],6999,1,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),true,1800000 milliseconds)) - |-E~BinaryJoinExec(binaryOp=ADD, on=List(), ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7600,100,10000)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7000000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7600,100,10000)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7600000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#255666709],raw)""".stripMargin + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#304042718],raw)""".stripMargin val query2 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m]))" val lp2 = Parser.queryRangeToLogicalPlan(query2, TimeStepParams(2000, stepSecs, 10000)) @@ -1789,16 +1737,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val promQlQueryParams2 = PromQlQueryParams(query2, 1000, 100, 10000) val execPlan2 = engine.materialize(lp2, QueryContext(origQueryParams = promQlQueryParams2, plannerParams = PlannerParams(processMultiPartition = true))) - print(execPlan2.printTree()) - // validatePlan(execPlan1, expectedPlanWithRemoteExport) - - val query3 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m]))" - val lp3 = Parser.queryRangeToLogicalPlan(query3, TimeStepParams(2000, stepSecs, 10000)) - val promQlQueryParams3 = PromQlQueryParams(query3, 1000, 100, 10000) - val execPlan3 = engine.materialize(lp3, QueryContext(origQueryParams = promQlQueryParams3, plannerParams = - PlannerParams(processMultiPartition = true))) - validatePlan(execPlan3, expectedPlanWithRemoteExec1) + validatePlan(execPlan2, expectedPlanWithRemoteExec1) val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))"