From fac0efb4a123a5a2e6e215a5f340ab5de729aac0 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 29 Aug 2023 01:33:29 -0700 Subject: [PATCH] add tests --- .../queryplanner/PlannerHierarchySpec.scala | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index f00bdfc0cf..60630cf578 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -19,6 +19,7 @@ import filodb.query.{BadQueryException, IntervalSelector, LabelCardinality, Plan import filodb.query.exec._ // scalastyle:off line.size.limit +// scalastyle:off number.of.methods class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationSpec { private implicit val system: ActorSystem = ActorSystem() private val node = TestProbe().ref @@ -3530,8 +3531,81 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS for ((query, expected) <- queryExpectedPairs) { val lp = Parser.queryRangeToLogicalPlan(query, timeParams) val exec = planner.materialize(lp, qContext) - println(exec.printTree()) validatePlan(exec, expected) } } + + it("should create more than one plan per partition when keys have different prefixes") { + val dataset = MetricsTestData.timeseriesDatasetMultipleShardKeys + val shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]] = filters => Seq( + // Selectors will always match four keys. + Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns1"))), + Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns2"))), + Seq(ColumnFilter("_ws_", Filter.Equals("bar_2")), ColumnFilter("_ns_", Filter.Equals("ns3"))), + Seq(ColumnFilter("_ws_", Filter.Equals("baz_2")), ColumnFilter("_ns_", Filter.Equals("ns4"))), + ) + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { + // using the last char of the _ws_ label as the partition name + List(PartitionAssignment(routingKey("_ws_").last.toString, "dummy-endpoint", timeRange)) + } + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = + getPartitions(nonMetricShardKeyFilters.map(filter => (filter.column, filter.filter.valuesStrings.head.toString)).toMap, timeRange) + } + + val mppPlanner = new MultiPartitionPlanner(partitionLocationProvider, singlePartitionPlanner, "local", dataset, queryConfig) + val planner = new ShardKeyRegexPlanner(dataset, mppPlanner, shardKeyMatcher, partitionLocationProvider, queryConfig) + + val timeParams = TimeStepParams(startSeconds, step, endSeconds) + val qContext = QueryContext(origQueryParams = queryParams, + plannerParams = PlannerParams(processMultiPartition = true)) + + val query ="""foo{_ws_="dummy", _ns_=~"dummy.*", bar="hello"}""" + val expected = """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="bar_2",_ns_="ns3"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="baz_2",_ns_="ns4"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_=~"ns1|ns2"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))""".stripMargin + val lp = Parser.queryRangeToLogicalPlan(query, timeParams) + val exec = planner.materialize(lp, qContext) + validatePlan(exec, expected) + } + + it("should materialize a plan per shard-key when reduceShardKeyRegexFanout=false") { + val dataset = MetricsTestData.timeseriesDatasetMultipleShardKeys + val shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]] = filters => Seq( + // Selectors will always match four keys. + Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns1"))), + Seq(ColumnFilter("_ws_", Filter.Equals("foo_1")), ColumnFilter("_ns_", Filter.Equals("ns2"))), + Seq(ColumnFilter("_ws_", Filter.Equals("bar_2")), ColumnFilter("_ns_", Filter.Equals("ns3"))), + Seq(ColumnFilter("_ws_", Filter.Equals("baz_2")), ColumnFilter("_ns_", Filter.Equals("ns4"))), + ) + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { + // using the last char of the _ws_ label as the partition name + List(PartitionAssignment(routingKey("_ws_").last.toString, "dummy-endpoint", timeRange)) + } + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = + getPartitions(nonMetricShardKeyFilters.map(filter => (filter.column, filter.filter.valuesStrings.head.toString)).toMap, timeRange) + } + + val mppPlanner = new MultiPartitionPlanner(partitionLocationProvider, singlePartitionPlanner, "local", dataset, queryConfig) + val planner = new ShardKeyRegexPlanner(dataset, mppPlanner, shardKeyMatcher, partitionLocationProvider, queryConfig) + + val timeParams = TimeStepParams(startSeconds, step, endSeconds) + val qContext = QueryContext(origQueryParams = queryParams, + plannerParams = PlannerParams(processMultiPartition = true, reduceShardKeyRegexFanout = false)) + + val query = """foo{_ws_="dummy", _ns_=~"dummy.*", bar="hello"}""" + val expected = + """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_="ns1"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="foo_1",_ns_="ns2"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="bar_2",_ns_="ns3"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536))) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{bar="hello",_ws_="baz_2",_ns_="ns4"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,false), queryEndpoint=dummy-endpoint, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))""".stripMargin + val lp = Parser.queryRangeToLogicalPlan(query, timeParams) + val exec = planner.materialize(lp, qContext) + validatePlan(exec, expected) + } }