Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Feb 2, 2024
1 parent 70f4fec commit 2b24b76
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1635,23 +1635,23 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None)
|-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)
|------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)
|-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)
|------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(2000,100,10000))
|--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)
|-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000))
|--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(1400000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)
|----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(1400000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#880854731],raw)""".stripMargin
|----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1331125564],raw)""".stripMargin

val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local",
dataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy(supportRemoteRawExport = true)))
val query1 = "sum(rate(test{job = \"app\"}[10m]))"
val lp1 = Parser.queryRangeToLogicalPlan(query1, TimeStepParams(2000, stepSecs, 10000))
val lp1 = Parser.queryRangeToLogicalPlan(query1, TimeStepParams(1000, stepSecs, 10000))

val promQlQueryParams = PromQlQueryParams(query1, 1000, 100, 10000)
val execPlan1 = engine.materialize(lp1, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
Expand All @@ -1667,36 +1667,36 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None)
|------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(5500000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|-------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7899))
|---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(300000), functionId=Some(Rate), rawSource=false, offsetMs=None)
|------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(5800000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(5800000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(5800000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(5800000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|-------E~PromQlRemoteExec(PromQlQueryParams(bar{job="app"}[1200s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))
|-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(2000,100,10000))
|---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000))
|---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(1400000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(1400000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(2000,100,10000))
|---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000))
|---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(1700000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)
|-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)
|----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|-----T~PeriodicSamplesMapper(start=2000000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(1700000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#478255912],raw)""".stripMargin
|-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#-2073708547],raw)""".stripMargin

val query2 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m]))"
val lp2 = Parser.queryRangeToLogicalPlan(query2, TimeStepParams(2000, stepSecs, 10000))
Expand Down
Loading

0 comments on commit 2b24b76

Please sign in to comment.