From 5659d3e6eeadc2fcd1b2e53c349fa06164a85497 Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Tue, 10 Dec 2024 11:17:14 -0800 Subject: [PATCH 1/7] delete validation for "split" queries contain binary joins with offsets --- .../queryplanner/MultiPartitionPlanner.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94a56d8269..94fbdb4ec6 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -567,30 +567,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv }} } - /** - * Throws a BadQueryException if any of the following conditions hold: - * (1) the plan spans more than one non-metric shard key prefix. - * (2) the plan contains at least one BinaryJoin, and any of its BinaryJoins contain an offset. - * @param splitLeafPlan must contain leaf plans that individually span multiple partitions. - */ - private def validateSplitLeafPlan(splitLeafPlan: LogicalPlan): Unit = { - val baseErrorMessage = "This query contains selectors that individually read data from multiple partitions. " + - "This is likely because a selector's data was migrated between partitions. " - if (hasBinaryJoin(splitLeafPlan) && getOffsetMillis(splitLeafPlan).exists(_ > 0)) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries cannot contain binary joins with offsets." - ) - } - lazy val hasMoreThanOneNonMetricShardKey = - LogicalPlanUtils.resolvePipeConcatenatedShardKeyFilters(splitLeafPlan, dataset.options.nonMetricShardColumns) - .filter(_.nonEmpty).distinct.size > 1 - if (hasMoreThanOneNonMetricShardKey) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries are not supported if they contain multiple non-metric shard keys." - ) - } - } - /** * Materializes a LogicalPlan with leaves that individually span multiple partitions. * All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they @@ -603,7 +579,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv qContext: QueryContext): PlanResult = { // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within // the limits of max range of data exported - validateSplitLeafPlan(logicalPlan) val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max From b5aa44bf4dac18397f8d724a94a5ee7c8a259ba3 Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Tue, 10 Dec 2024 15:09:04 -0800 Subject: [PATCH 2/7] delete validation for "split" queries contain binary joins with offsets --- .../filodb.coordinator/queryplanner/MultiPartitionPlanner.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94fbdb4ec6..e22c7966a7 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -579,6 +579,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv qContext: QueryContext): PlanResult = { // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within // the limits of max range of data exported + val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max From b83e9477b6a8b35dd745717ded1051078ef2ed66 Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Tue, 10 Dec 2024 15:29:48 -0800 Subject: [PATCH 3/7] delete validation for "split" queries contain binary joins with offsets --- .../filodb.coordinator/queryplanner/MultiPartitionPlanner.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index e22c7966a7..94fbdb4ec6 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -579,7 +579,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv qContext: QueryContext): PlanResult = { // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within // the limits of max range of data exported - val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max From b6b9633012ac738a148bfdb1f8e772f9d72beab7 Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Tue, 10 Dec 2024 17:52:18 -0800 Subject: [PATCH 4/7] delete corresponding tests --- .../MultiPartitionPlannerSpec.scala | 14 +---- .../queryplanner/PlannerHierarchySpec.scala | 63 +------------------ 2 files changed, 2 insertions(+), 75 deletions(-) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 57897657c0..f40712935b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.BinaryOperator.{ADD, LAND} import filodb.query.InstantFunctionId.Ln -import filodb.query.{BadQueryException, LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} +import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} import filodb.query.exec._ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{ @@ -1712,18 +1712,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan2, expectedPlanWithRemoteExec1) - val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))" - val lp4 = Parser.queryRangeToLogicalPlan(query4, TimeStepParams(2000, stepSecs, 10000)) - - val promQlQueryParams4 = PromQlQueryParams(query4, 1000, 100, 10000) - intercept[BadQueryException] { - // Expecting to see Exception when we use BinaryJoin with offsets, technically this too should not be a big deal - // as we need to identify the right window, however this was not supported even before the change and it is ok to - // leave it unaddressed in the first phase as its just Binary joins with offsets - engine.materialize(lp4, QueryContext(origQueryParams = promQlQueryParams4, plannerParams = - PlannerParams(processMultiPartition = true))) - } - // Planner with period of uncertainty should still generate steps that are aligned with start and step, // that is should be snapped correctly diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 13c2089511..538415a0dd 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -15,7 +15,7 @@ import filodb.core.query.Filter.{Equals, EqualsRegex} import filodb.prometheus.ast.{TimeStepParams, WindowConstants} import filodb.prometheus.parse.Parser import filodb.prometheus.parse.Parser.Antlr -import filodb.query.{BadQueryException, IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} +import filodb.query.{IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} import filodb.query.exec._ // scalastyle:off line.size.limit @@ -2766,67 +2766,6 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } - it ("should fail to materialize unsupported split-partition queries with binary joins") { - val startSec = 123 - val stepSec = 456 - val endSec = 789 - val queries = Seq( - // aggregate - """sum(foo{job="app"} + bar{job="app2"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """sum(foo{job="app"} offset 1h + bar{job="app"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // instant - """sgn(foo{job="app"} + bar{job="app2"})""", - """exp(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """ln(foo{job="app"} offset 1h + bar{job="app"})""", - """log2(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // binary join - """foo{job="app"} + bar{job="app2"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app2"})""", - """foo{job="app"} offset 1h + bar{job="app"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h)""", - // scalar vector join - """123 + (foo{job="app"} + bar{job="app2"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """123 + (foo{job="app"} offset 1h + bar{job="app"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // scalar - """scalar(foo{job="app"} + bar{job="app2"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """scalar(foo{job="app"} offset 1h + bar{job="app"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // absent - """absent(foo{job="app"} + bar{job="app2"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """absent(foo{job="app"} offset 1h + bar{job="app"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - ) - val partitionLocationProvider = new PartitionLocationProvider { - override def getPartitions(routingKey: Map[String, String], - timeRange: TimeRange): List[PartitionAssignment] = { - val midTime = (timeRange.startMs + timeRange.endMs) / 2 - List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, midTime)), - PartitionAssignment("remote1", "remote1-url", TimeRange(midTime, timeRange.endMs))) - } - - override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], - timeRange: TimeRange): List[PartitionAssignment] = - throw new RuntimeException("should not use") - } - val engine = new MultiPartitionPlanner( - partitionLocationProvider, singlePartitionPlanner, "local", - MetricsTestData.timeseriesDataset, queryConfig - ) - for (query <- queries) { - val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec)) - val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec) - assertThrows[BadQueryException] { - engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, - plannerParams = PlannerParams(processMultiPartition = true))) - } - } - } it("should handle multi partition topk correctly") { // Cases to handle From cf5cf8c995c4fffd1c6788642805d5ff743235db Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Wed, 11 Dec 2024 11:52:19 -0800 Subject: [PATCH 5/7] add test for split query with offset binary joins --- .../queryplanner/PlannerHierarchySpec.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 538415a0dd..ffbcb202db 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -2766,6 +2766,42 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } + it ("should materialize split-partition queries with offset binary joins correctly"){ + val startSec = 0 + val stepSec = 3 + val endSec = 9999 + val splitSec = 5000 + val query = """sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"})""" + + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitions(routingKey: Map[String, String], + timeRange: TimeRange): List[PartitionAssignment] = { + val splitMs = 1000 * splitSec + List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs)), + PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs + 1, timeRange.endMs), Some("grpc-remote1-url"))) + } + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], + timeRange: TimeRange): List[PartitionAssignment] = + throw new RuntimeException("should not use") + } + val engine = new MultiPartitionPlanner( + partitionLocationProvider, singlePartitionPlanner, "local", + MetricsTestData.timeseriesDataset, queryConfig + ) + val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec)) + val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec) + val execPlan = engine.materialize(lp, + QueryContext(origQueryParams = promQlQueryParams, + plannerParams = PlannerParams(processMultiPartition = true)) + ) + val expectedPlan = + """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"}),0,3,5300,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-E~PromQLGrpcRemoteExec(PromQlQueryParams(sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"}),5601,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpc-remote1-url.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048)))""".stripMargin + + validatePlan(execPlan, expectedPlan) + } it("should handle multi partition topk correctly") { // Cases to handle From 875e6393a7425d4da3e1e43af8882ef073798ee0 Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Wed, 11 Dec 2024 14:17:49 -0800 Subject: [PATCH 6/7] increase maxRemoteRawExportTimeRange --- core/src/main/scala/filodb.core/query/QueryConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/filodb.core/query/QueryConfig.scala b/core/src/main/scala/filodb.core/query/QueryConfig.scala index 6b2363ca4c..958d67a15e 100644 --- a/core/src/main/scala/filodb.core/query/QueryConfig.scala +++ b/core/src/main/scala/filodb.core/query/QueryConfig.scala @@ -83,7 +83,7 @@ object QueryConfig { } case class RoutingConfig( supportRemoteRawExport: Boolean = false, - maxRemoteRawExportTimeRange: FiniteDuration = 3 days, + maxRemoteRawExportTimeRange: FiniteDuration = 31 days, enableApproximatelyEqualCheckInStitch: Boolean = true, periodOfUncertaintyMs: Long = (5 minutes).toMillis ) From 01147471b8a7765f6c334775bf161728dab4bdab Mon Sep 17 00:00:00 2001 From: Brian-Yu Date: Thu, 12 Dec 2024 15:38:29 -0800 Subject: [PATCH 7/7] updated tests --- .../queryplanner/MultiPartitionPlanner.scala | 2 - .../queryplanner/PlannerHierarchySpec.scala | 79 +++++++++++++++---- .../scala/filodb.core/query/QueryConfig.scala | 2 +- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94fbdb4ec6..e57b112663 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -577,8 +577,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv //scalastyle:off method.length private def materializeSplitLeafPlan(logicalPlan: LogicalPlan, qContext: QueryContext): PlanResult = { - // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within - // the limits of max range of data exported val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index ffbcb202db..f3ddc5a6e1 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -18,6 +18,8 @@ import filodb.prometheus.parse.Parser.Antlr import filodb.query.{IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} import filodb.query.exec._ +import java.util.concurrent.TimeUnit + // scalastyle:off line.size.limit // scalastyle:off number.of.methods class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationSpec { @@ -2766,19 +2768,64 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } - it ("should materialize split-partition queries with offset binary joins correctly"){ + + it ("should materialize split-partition queries with lookback and offset binary joins correctly"){ val startSec = 0 val stepSec = 3 val endSec = 9999 val splitSec = 5000 - val query = """sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"})""" + + case class Test(query: String, expected: String = "") + val tests = Seq( + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5001,3,6803)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=5001000, step=3000, end=6803000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),6804,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),0,3,6800,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6801,3,7403)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=6801000, step=3000, end=7403000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1800000)) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),7404,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,6200,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1200000)) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),8004,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ) + ) val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { val splitMs = 1000 * splitSec List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs)), - PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs + 1, timeRange.endMs), Some("grpc-remote1-url"))) + PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs + 1, timeRange.endMs))) } override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], @@ -2787,20 +2834,20 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } val engine = new MultiPartitionPlanner( partitionLocationProvider, singlePartitionPlanner, "local", - MetricsTestData.timeseriesDataset, queryConfig - ) - val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec)) - val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec) - val execPlan = engine.materialize(lp, - QueryContext(origQueryParams = promQlQueryParams, - plannerParams = PlannerParams(processMultiPartition = true)) - ) - val expectedPlan = - """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) - |-E~PromQlRemoteExec(PromQlQueryParams(sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"}),0,3,5300,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(sum(test{_ws_ = "demo", _ns_ = "localNs"} offset 5m) + sum(test{_ws_ = "demo", _ns_ = "localNs"}),5601,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpc-remote1-url.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048)))""".stripMargin + MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy( + supportRemoteRawExport = true, + maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS), + periodOfUncertaintyMs = 3000))) - validatePlan(execPlan, expectedPlan) + for (test <- tests) { + val lp = Parser.queryRangeToLogicalPlan(test.query, TimeStepParams(startSec, stepSec, endSec)) + val promQlQueryParams = PromQlQueryParams(test.query, startSec, stepSec, endSec) + val execPlan = engine.materialize(lp, + QueryContext(origQueryParams = promQlQueryParams, + plannerParams = PlannerParams(processMultiPartition = true)) + ) + validatePlan(execPlan, test.expected) + } } it("should handle multi partition topk correctly") { diff --git a/core/src/main/scala/filodb.core/query/QueryConfig.scala b/core/src/main/scala/filodb.core/query/QueryConfig.scala index 958d67a15e..6b2363ca4c 100644 --- a/core/src/main/scala/filodb.core/query/QueryConfig.scala +++ b/core/src/main/scala/filodb.core/query/QueryConfig.scala @@ -83,7 +83,7 @@ object QueryConfig { } case class RoutingConfig( supportRemoteRawExport: Boolean = false, - maxRemoteRawExportTimeRange: FiniteDuration = 31 days, + maxRemoteRawExportTimeRange: FiniteDuration = 3 days, enableApproximatelyEqualCheckInStitch: Boolean = true, periodOfUncertaintyMs: Long = (5 minutes).toMillis )