diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala index 366b1a672c..6c56a6a0fe 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala @@ -161,7 +161,11 @@ class ShardKeyRegexPlanner(val dataset: Dataset, // That is a problem for e.g. scalars or absent(). if (hasSameShardKeyFilters && partitions.size == 1) { val plans = generateExec(logicalPlan, shardKeys, qContext) - return PlanResult(plans) + // If !=1 plans were produced, additional high-level coordination may be needed + // (e.g. joins / aggregations). In that case, proceed through the logic below. + if (plans.size == 1) { + return PlanResult(plans) + } } logicalPlan match { case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(qContext, lp) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 6aef9e3312..46137abe1a 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -339,6 +339,106 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS validatePlan(execPlan, expected) } + it ("should aggregate at root when batch size is small enough to produce multiple plans from one partition") { + val lp = Parser.queryRangeToLogicalPlan( + """sum(foo{_ws_="my-ws",_ns_=~"localNs.*"})""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext( + origQueryParams = queryParams, + plannerParams = PlannerParams(maxShardKeyRegexFanoutBatchSize = 1))) // set the batch size == 1 + val expected = """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |-E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-903737658],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("should join at root when batch size is small enough to produce multiple plans from one partition") { + val lp = Parser.queryRangeToLogicalPlan( + """foo{_ws_="my-ws",_ns_=~"localNs.*"} + foo{_ws_="my-ws",_ns_=~"localNs.*"}""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext( + origQueryParams = queryParams, + plannerParams = PlannerParams(maxShardKeyRegexFanoutBatchSize = 1))) // set the batch size == 1 + val expected = + """E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |-E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample) + |----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-929007190],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + it("should generate plan for one namespace query across raw/downsample") { val lp = Parser.queryRangeToLogicalPlan( """sum(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" })""",