From a02b58476e1306d2cc184315acbc556f1fe02760 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Tue, 17 Oct 2023 16:52:53 -0700 Subject: [PATCH] add test to PlannerHierarchySpec. --- .../queryplanner/PlannerHierarchySpec.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 388cf79a12..6df1a3ac39 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -1837,6 +1837,46 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS validatePlan(execPlan, expectedPlan) } + it("Modifier should have PeriodicSamplesMapper wrapped by RepeatTransformer") { + val lp = Parser.queryRangeToLogicalPlan( + """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND + | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin, + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """E~StitchRvsExec() on InProcessPlanDispatcher + |-E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |--T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |--T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |--T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634172830,300,1634777330)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |-----T~RepeatTransformer(startMs=1634172830000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |-----T~RepeatTransformer(startMs=1634172830000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) + |-E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633913330,300,1634172530)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634172530000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634172530000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)""".stripMargin + validatePlan(execPlan, expectedPlan) + } it("Vector plan on a binary join in RR and raw happen in memory") { val lp = Parser.queryRangeToLogicalPlan(