Skip to content

Commit

Permalink
fix unary expressions. (#1697)
Browse files Browse the repository at this point in the history
* fix unary expressions.

1. support queries with multiple unary sign such as -+--foo
2. fix rhs scalar unary expression. Basically, the right plan for this case should be
ScalarBinaryOperationExec instead of BinaryJoinExec.
---------

Co-authored-by: Yu Zhang <[email protected]>
  • Loading branch information
yu-shipit and Yu Zhang authored Dec 20, 2023
1 parent d419d6b commit 22e83d0
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,158 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS

private val queryParams = PromQlQueryParams("notUsedQuery", 100, 1, 1000)

it("Plan with unary expression should be equals to its binary counterpart.") {
val lp = Parser.queryRangeToLogicalPlan(
"""-foo{_ws_ = "demo", _ns_ = "localNs"} > -1""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))

val lp2 = Parser.queryRangeToLogicalPlan(
"""(0 - foo{_ws_ = "demo", _ns_ = "localNs"}) > (0 - 1)""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan2 = rootPlanner.materialize(lp2, QueryContext(origQueryParams = queryParams))

validatePlan(execPlan, execPlan2.printTree())
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("Plan with unary expression should be equals to its binary counterpart2.") {
val unaryExpressions = List("""-foo{_ws_ = "demo", _ns_ = "localNs"}""", """+foo{_ws_ = "demo", _ns_ = "localNs"}""",
"""-(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"})""",
"""+(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"})""")
val binaryExpressions = List("""(0 -foo{_ws_ = "demo", _ns_ = "localNs"})""", """(0 + foo{_ws_ = "demo", _ns_ = "localNs"})""",
"""(0 -(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"}))""",
"""(0 + (foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"}))""")
unaryExpressions.zip(binaryExpressions).foreach(
pair => {
val lp1 = Parser.queryRangeToLogicalPlan(pair._1, TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan1 = rootPlanner.materialize(lp1, QueryContext(origQueryParams = queryParams))
val lp2 = Parser.queryRangeToLogicalPlan(pair._2, TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan2 = rootPlanner.materialize(lp2, QueryContext(origQueryParams = queryParams))
validatePlan(execPlan1, execPlan2.printTree())
}
)
}

it("Should be able to handle multiple unary signs.") {
val lp = Parser.queryRangeToLogicalPlan(
"""-+---+-foo{_ws_ = "demo", _ns_ = "localNs"} > ---+--+--1""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("should generate plan for one namespace query across raw/downsample") {
val lp = Parser.queryRangeToLogicalPlan(
Expand Down
Loading

0 comments on commit 22e83d0

Please sign in to comment.