Skip to content

Commit

Permalink
add test to PlannerHierarchySpec.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Oct 17, 2023
1 parent 78c18a9 commit a02b584
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1837,6 +1837,46 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
validatePlan(execPlan, expectedPlan)
}

it("Modifier should have PeriodicSamplesMapper wrapped by RepeatTransformer") {
val lp = Parser.queryRangeToLogicalPlan(
"""rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND
| topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin,
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
val expectedPlan =
"""E~StitchRvsExec() on InProcessPlanDispatcher
|-E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|--T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|--T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|--T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634172830,300,1634777330))
|---E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|-----T~RepeatTransformer(startMs=1634172830000, stepMs=300000, endMs=1634777330000, funcParams=List())
|------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|-----T~RepeatTransformer(startMs=1634172830000, stepMs=300000, endMs=1634777330000, funcParams=List())
|------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)
|-E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)
|--T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)
|--T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)
|--T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633913330,300,1634172530))
|---E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)
|----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634172530000, funcParams=List())
|------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)
|----T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634172530000, funcParams=List())
|------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)""".stripMargin
validatePlan(execPlan, expectedPlan)
}

it("Vector plan on a binary join in RR and raw happen in memory") {
val lp = Parser.queryRangeToLogicalPlan(
Expand Down

0 comments on commit a02b584

Please sign in to comment.