From 078c66f7af190bc10fa9c741fbe438a50aaf5a14 Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Tue, 10 Sep 2024 17:50:53 -0700 Subject: [PATCH] Adding by clause unit tests --- .../queryplanner/LogicalPlanParserSpec.scala | 61 ++++++++++++++++--- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index c28e5d2332..e255c6ad1b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -345,23 +345,34 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) } - it("LogicalPlan update for hierarchical aggregation queries with by clause") { - // CASE 1 - should update since the by clause labels are part of include tags + // common parameters val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("job", "application", "instance", "version") - var query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)" - var lp = Parser.queryRangeToLogicalPlan(query, t) val params = HierarchicalQueryExperience(true, ":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + + // CASE 1 - Aggregate with by clause - should update the metric name as `by` clause labels are part of include tags + var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance)" + var lp = Parser.queryRangeToLogicalPlan(query, t) var lpUpdated = lp.useHigherLevelAggregatedMetric(params) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.map( - filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] .shouldEqual("my_counter:::agg_2") ) - // CASE 2 - should NOT update since the by clause labels are not part of include tags - query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version, id)) by (version)" + + // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now + query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 3 - should NOT update since the by clause labels are not part of include tags + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance, id)" lp = Parser.queryRangeToLogicalPlan(query, t) lpUpdated = lp.useHigherLevelAggregatedMetric(params) filterGroups = getColumnFilterGroup(lpUpdated) @@ -369,14 +380,44 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] .shouldEqual("my_counter:::agg") ) - - query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application, mode) sum(my_counter{job=\"spark\", application=\"filodb\"}) by (job, application)" + // CASE 4 - should update since the by clause labels are part of include tags - binary join case + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (job, application)" lp = Parser.queryRangeToLogicalPlan(query, t) lpUpdated = lp.useHigherLevelAggregatedMetric(params) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.map( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] - .endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(false) + .endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true) + ) + + // CASE 5 - lhs should not be updated since it is not a aggregated metric - binary join case + query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (job, application)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg_2") + ) + + // CASE 6 - rhs should not be updated since it has column filters which is not present in include tags + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\", id=\"1\"}) by (job, application)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge:::agg_2") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") ) } }