Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Aug 29, 2023
1 parent 141889e commit fac0efb
Showing 1 changed file with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import filodb.query.{BadQueryException, IntervalSelector, LabelCardinality, Plan
import filodb.query.exec._

// scalastyle:off line.size.limit
// scalastyle:off number.of.methods
class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationSpec {
private implicit val system: ActorSystem = ActorSystem()
private val node = TestProbe().ref
Expand Down Expand Up @@ -3530,8 +3531,81 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
for ((query, expected) <- queryExpectedPairs) {
val lp = Parser.queryRangeToLogicalPlan(query, timeParams)
val exec = planner.materialize(lp, qContext)
println(exec.printTree())
validatePlan(exec, expected)
}
}

it("should create more than one plan per partition when keys have different prefixes") {
val dataset = MetricsTestData.timeseriesDatasetMultipleShardKeys
val shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]] = filters => Seq(
// Selectors will always match four keys.
Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns1"))),
Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns2"))),
Seq(ColumnFilter("_ws_", Filter.Equals("bar_2")), ColumnFilter("_ns_", Filter.Equals("ns3"))),
Seq(ColumnFilter("_ws_", Filter.Equals("baz_2")), ColumnFilter("_ns_", Filter.Equals("ns4"))),
)
val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = {
// using the last char of the _ws_ label as the partition name
List(PartitionAssignment(routingKey("_ws_").last.toString, "dummy-endpoint", timeRange))
}

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] =
getPartitions(nonMetricShardKeyFilters.map(filter => (filter.column, filter.filter.valuesStrings.head.toString)).toMap, timeRange)
}

val mppPlanner = new MultiPartitionPlanner(partitionLocationProvider, singlePartitionPlanner, "local", dataset, queryConfig)
val planner = new ShardKeyRegexPlanner(dataset, mppPlanner, shardKeyMatcher, partitionLocationProvider, queryConfig)

val timeParams = TimeStepParams(startSeconds, step, endSeconds)
val qContext = QueryContext(origQueryParams = queryParams,
plannerParams = PlannerParams(processMultiPartition = true))

val query ="""foo{_ws_="dummy", _ns_=~"dummy.*", bar="hello"}"""
val expected = """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="bar_2",_ns_="ns3"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="baz_2",_ns_="ns4"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_=~"ns1|ns2"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))""".stripMargin
val lp = Parser.queryRangeToLogicalPlan(query, timeParams)
val exec = planner.materialize(lp, qContext)
validatePlan(exec, expected)
}

it("should materialize a plan per shard-key when reduceShardKeyRegexFanout=false") {
val dataset = MetricsTestData.timeseriesDatasetMultipleShardKeys
val shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]] = filters => Seq(
// Selectors will always match four keys.
Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns1"))),
Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns2"))),
Seq(ColumnFilter("_ws_", Filter.Equals("bar_2")), ColumnFilter("_ns_", Filter.Equals("ns3"))),
Seq(ColumnFilter("_ws_", Filter.Equals("baz_2")), ColumnFilter("_ns_", Filter.Equals("ns4"))),
)
val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = {
// using the last char of the _ws_ label as the partition name
List(PartitionAssignment(routingKey("_ws_").last.toString, "dummy-endpoint", timeRange))
}

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] =
getPartitions(nonMetricShardKeyFilters.map(filter => (filter.column, filter.filter.valuesStrings.head.toString)).toMap, timeRange)
}

val mppPlanner = new MultiPartitionPlanner(partitionLocationProvider, singlePartitionPlanner, "local", dataset, queryConfig)
val planner = new ShardKeyRegexPlanner(dataset, mppPlanner, shardKeyMatcher, partitionLocationProvider, queryConfig)

val timeParams = TimeStepParams(startSeconds, step, endSeconds)
val qContext = QueryContext(origQueryParams = queryParams,
plannerParams = PlannerParams(processMultiPartition = true, reduceShardKeyRegexFanout = false))

val query = """foo{_ws_="dummy", _ns_=~"dummy.*", bar="hello"}"""
val expected =
"""E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_="ns1"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_="ns2"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="bar_2",_ns_="ns3"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="baz_2",_ns_="ns4"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))""".stripMargin
val lp = Parser.queryRangeToLogicalPlan(query, timeParams)
val exec = planner.materialize(lp, qContext)
validatePlan(exec, expected)
}
}

0 comments on commit fac0efb

Please sign in to comment.