diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index 91cac158f6..8477f5072e 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.should.Matchers import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.LogicalPlan.getColumnFilterGroup -import filodb.query.util.{ExcludeAggRule, IncludeAggRule} +import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule} import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} class LogicalPlanParserSpec extends AnyFunSpec with Matchers { @@ -304,8 +304,9 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 1 - BinaryJoin (lhs = Aggregate, rhs = Aggregate) - Both lhs and rhs should be updated val binaryJoinAggregationBothOptimization = "sum(metric1:::agg{aggTag=\"app\"}) + sum(metric2:::agg{aggTag=\"app\"})" var lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationBothOptimization, t) - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) lpUpdated.isInstanceOf[BinaryJoin] shouldEqual true lpUpdated.asInstanceOf[BinaryJoin].lhs.isInstanceOf[Aggregate] shouldEqual true lpUpdated.asInstanceOf[BinaryJoin].rhs.isInstanceOf[Aggregate] shouldEqual true @@ -317,7 +318,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2 - BinaryJoin (lhs = Aggregate, rhs = Aggregate) - rhs should be updated val binaryJoinAggregationRHSOptimization = "sum(metric1:::agg{nonAggTag=\"abc\"}) + sum(metric2:::agg{aggTag=\"app\"})" lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationRHSOptimization, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -332,7 +333,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // not an aggregated metric, even if both the metrics qualify for aggregation val binaryJoinAggregationLHSOptimization = "sum(metric1:::agg{aggTag=\"abc\"}) + sum(metric2{aggTag=\"app\"})" lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationLHSOptimization, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -350,11 +351,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("aggTag", "aggTag2", "aggTag3", "aggTag4") - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) // CASE 1 - Aggregate with by clause - should update the metric name as `by` clause labels are part of include tags var query = "sum(rate(my_counter:::agg{aggTag=\"spark\", aggTag2=\"app\"}[5m])) by (aggTag4, aggTag3)" var lp = Parser.queryRangeToLogicalPlan(query, t) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -363,7 +365,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now query = "sum(bottomk(2, my_counter:::agg{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag3, aggTag4)) by (aggTag4)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -372,7 +374,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 3 - should NOT update since the by clause labels are not part of include tags query = "sum(rate(my_counter:::agg{aggTag=\"spark\", aggTag2=\"app\"}[5m])) by (aggTag4, aggTag3, nonAggTag)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -381,7 +383,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 4 - should update since the by clause labels are part of include tags - binary join case query = "sum(my_gauge:::agg{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag, aggTag2) and on(aggTag, aggTag2) sum(my_counter:::agg{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag, aggTag2)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -390,7 +392,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 5 - lhs should not be updated since it does not match regex pattern - binary join case query = "sum(my_gauge{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag, aggTag2) and on(aggTag, aggTag2) sum(my_counter:::agg{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag, aggTag2)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -404,7 +406,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 6 - rhs should not be updated since it has column filters which is not present in include tags query = "sum(my_gauge:::agg{aggTag=\"spark\", aggTag2=\"filodb\"}) by (aggTag, aggTag2) and on(aggTag, aggTag2) sum(my_counter:::agg{aggTag=\"spark\", aggTag2=\"filodb\", nonAggTag=\"1\"}) by (aggTag, aggTag2)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -422,11 +424,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationExcludeTags = Set("excludeAggTag", "excludeAggTag2") - val params = ExcludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationExcludeTags) + val excludeAggRule = ExcludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationExcludeTags) + val excludeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> excludeAggRule)) // CASE 1 - should update the metric name as `by` clause labels are not part of exclude tags var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (host)" var lp = Parser.queryRangeToLogicalPlan(query, t) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -435,7 +438,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2 - should NOT update the metric name as column filters are not part of exclude tags query = "sum(rate(my_counter:::agg{nonAggTag=\"spark\", application=\"app\", excludeAggTag2=\"2.0\"}[5m]))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -444,7 +447,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"})) by (host)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -453,7 +456,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 3 - should NOT update since the by clause labels intersect with exclude tags query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (excludeAggTag2, excludeAggTag, id)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -462,7 +465,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 4 - should update since the by clause labels are not part of exclude tags - binary join case query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host) + sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -471,7 +474,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 5 - lhs should not be updated since it does not match regex pattern - binary join case query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (id, host) - sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -485,7 +488,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 6 - rhs should not be updated since it has column filters which are part of exclude tags query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host) / sum(your_gauge:::agg{job=\"spark\", application=\"filodb\", excludeAggTag2=\"1\"}) by (id, host)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -503,11 +506,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationExcludeTags = Set("excludeAggTag", "excludeAggTag2") - val params = ExcludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationExcludeTags) + val excludeAggRule = ExcludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationExcludeTags) + val excludeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> excludeAggRule)) // CASE 1 - should update since the exclude tags are subset of the without clause labels var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (excludeAggTag2, excludeAggTag)" var lp = Parser.queryRangeToLogicalPlan(query, t) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -516,7 +520,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (excludeAggTag, excludeAggTag2))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -525,7 +529,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 3 - should NOT update since the column filter label is part of exclude tags query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\", excludeAggTag2=\"2\"}[5m]))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -534,7 +538,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 4 - should update since the exclude tags are subset of the without clause labels query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (excludeAggTag2, excludeAggTag, id)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -543,7 +547,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 5 - should not update since the exclude tags are not subset of the without clause labels query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (excludeAggTag2)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -552,7 +556,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 6 - should update since the exclude tags are subset of without clause labels - binary join case query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (excludeAggTag2, excludeAggTag) and ignoring(excludeAggTag2, excludeAggTag) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (excludeAggTag2, excludeAggTag)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -561,7 +565,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 7 - lhs should not be updated since it does not match regex pattern - binary join case query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) without (excludeAggTag2, excludeAggTag) and ignoring(excludeAggTag2, excludeAggTag) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (excludeAggTag2, excludeAggTag)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -575,7 +579,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 8 - rhs should not be updated since it has column filters which is part of exclude tags query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (excludeAggTag2, excludeAggTag) and ignoring(excludeAggTag2, excludeAggTag) sum(my_counter:::agg{job=\"spark\", application=\"filodb\", excludeAggTag2=\"1\"}) without (excludeAggTag2, excludeAggTag)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(excludeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -593,11 +597,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("job", "application", "instance", "version") - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) // All the cases should not be updated since without clause with include tags is not supported as of now var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (version, instance)" var lp = Parser.queryRangeToLogicalPlan(query, t) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -605,7 +610,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (instance, version))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -613,7 +618,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) + sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -621,7 +626,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) without (version, instance) - sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -634,7 +639,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) / sum(your_gauge:::agg{job=\"spark\", application=\"filodb\", version=\"1\"}) without (version, instance)" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -652,11 +657,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("job", "application", "instance", "version") - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) // CASE 1: Raw queries lp should not be updated directly var query = "my_counter:::agg{job=\"spark\", application=\"app\"}[5m]" var lp = Parser.queryToLogicalPlan(query, t.start, t.step) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -665,7 +671,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2: Simple range query without aggregates lp should not be updated directly query = "rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -678,12 +684,13 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("job", "application", "instance", "version") - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) // CASE 1: count aggregate should not be allowed var query = "count(my_gauge:::agg{job=\"spark\", application=\"app\"})" var lp = Parser.queryToLogicalPlan(query, t.start, t.step) lp.isInstanceOf[Aggregate] shouldEqual true - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -693,7 +700,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m]))" lp = Parser.queryRangeToLogicalPlan(query, t) lp.isInstanceOf[Aggregate] shouldEqual true - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -703,7 +710,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { query = "avg(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m]))" lp = Parser.queryRangeToLogicalPlan(query, t) lp.isInstanceOf[Aggregate] shouldEqual true - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -713,7 +720,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { query = "min(my_gauge:::agg{job=\"spark\", application=\"app\"})" lp = Parser.queryRangeToLogicalPlan(query, t) lp.isInstanceOf[Aggregate] shouldEqual true - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -723,7 +730,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { query = "max(my_gauge:::agg{job=\"spark\", application=\"app\"})" lp = Parser.queryRangeToLogicalPlan(query, t) lp.isInstanceOf[Aggregate] shouldEqual true - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -736,11 +743,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" var nextLevelAggregationTags = Set("aggTag1", "aggTag2", "aggTag3") - val params = IncludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeAggRule = IncludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val includeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> includeAggRule)) // CASE 1: should update the metric name as `by` clause labels are part of include tags var query = "sum(sum(my_counter:::agg{aggTag1=\"spark\", aggTag2=\"app\"}) by (aggTag1, aggTag2, aggTag3))" var lp = Parser.queryToLogicalPlan(query, t.start, t.step) - var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) var filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -749,7 +757,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 2: should not update since count aggregate operator is not allowed query = "sum by (aggTag1, aggTag2) (count by (aggTag1, aggTag2) (my_gauge:::agg{aggTag1=\"a\",aggTag2=\"b\"}))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -758,7 +766,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { // CASE 3: should update since min aggregate operator is allowed query = "sum by (aggTag1, aggTag2) (min by (aggTag1, aggTag2) (my_gauge:::agg{aggTag1=\"a\",aggTag2=\"b\"}))" lp = Parser.queryRangeToLogicalPlan(query, t) - lpUpdated = lp.useHigherLevelAggregatedMetric(params) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) filterGroups = getColumnFilterGroup(lpUpdated) filterGroups.foreach( filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] @@ -766,7 +774,8 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) // using excludeTags nextLevelAggregationTags = Set("excludeAggTag1", "excludeAggTag2") - val excludeParams = ExcludeAggRule(":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val excludeAggRule = ExcludeAggRule(nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + val excludeParams = HierarchicalQueryExperienceParams(":::", Map("agg" -> excludeAggRule)) // CASE 4: should update since excludeTags are not used query = "sum by (aggTag1, aggTag2) (sum by (aggTag1, aggTag2) (my_gauge:::agg{aggTag1=\"a\", aggTag2=\"b\"}))" lp = Parser.queryRangeToLogicalPlan(query, t) @@ -786,4 +795,169 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { .shouldEqual("my_gauge:::agg") ) } + + it("LogicalPlan update for BinaryJoin with multiple agg rules and suffixes") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val includeAggRule = IncludeAggRule("suffix1_2", Set("includeTag1", "includeTag2", "includeTag3")) + val excludeAggRule = ExcludeAggRule("suffix2_2", Set("excludeTag1", "excludeTag2")) + // Query with multiple agg rules and suffixes + val includeParams = HierarchicalQueryExperienceParams(":::", + Map("suffix1" -> includeAggRule, "suffix2" -> excludeAggRule)) + // CASE 1 - should update - simple binary join with two different aggregated metrics and suffixes, both of which are satisfying the next level aggregation metric constraints + var query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"})" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.foreach( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith("_2").shouldEqual(true) + ) + // CASE 2 - should NOT update as rhs is using an exclude tag + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) + sum(your_gauge:::suffix2{excludeTag1=\"spark\", notExcludeTag2=\"filodb\"})" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + var updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1_2").shouldEqual(true) + updatedMetricNamesSet.contains("your_gauge:::suffix2").shouldEqual(true) // not updated + // CASE 3 - should NOT update as lhs is not using an include tag + query = "sum(my_gauge:::suffix1{notIncludeTag1=\"spark\", includeTag2=\"filodb\"}) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"})" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1").shouldEqual(true) // not updated + updatedMetricNamesSet.contains("your_gauge:::suffix2_2").shouldEqual(true) + // CASE 4 - should NOT update as both lhs and rhs are not using appropriate tags for next level aggregation metric + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", notIncludeTag2=\"filodb\"}) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", excludeTag2=\"filodb\"})" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1").shouldEqual(true) // not updated + updatedMetricNamesSet.contains("your_gauge:::suffix2").shouldEqual(true) // not updated + } + + it("LogicalPlan update for BinaryJoin with multiple agg rules and suffixes with by clauses") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val includeAggRule = IncludeAggRule("suffix1_2", Set("includeTag1", "includeTag2", "includeTag3")) + val excludeAggRule = ExcludeAggRule("suffix2_2", Set("excludeTag1", "excludeTag2")) + // Query with multiple agg rules and suffixes + val includeParams = HierarchicalQueryExperienceParams(":::", + Map("suffix1" -> includeAggRule, "suffix2" -> excludeAggRule)) + // CASE 1 - should update - both lhs and rhs are satisfying the next level aggregation metric constraints + var query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (notExcludeTag1)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.foreach( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith("_2").shouldEqual(true) + ) + // CASE 2 - should NOT update as rhs is using an exclude tag + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (excludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + var updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1_2").shouldEqual(true) + updatedMetricNamesSet.contains("your_gauge:::suffix2").shouldEqual(true) // not updated + // CASE 3 - should NOT update as lhs is not using an include tag + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (notIncludeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (notExludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1").shouldEqual(true) // not updated + updatedMetricNamesSet.contains("your_gauge:::suffix2_2").shouldEqual(true) + // CASE 4 - should NOT update as both lhs and rhs are not using appropriate tags for next level aggregation metric + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (notIncludeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (excludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1").shouldEqual(true) // not updated + updatedMetricNamesSet.contains("your_gauge:::suffix2").shouldEqual(true) // not updated + } + + it("LogicalPlan update for BinaryJoin with multiple agg rules and suffixes with by and without clauses") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val includeAggRule = IncludeAggRule("suffix1_2", Set("includeTag1", "includeTag2", "includeTag3")) + val excludeAggRule = ExcludeAggRule("suffix2_2", Set("excludeTag1", "excludeTag2")) + // Query with multiple agg rules and suffixes + val includeParams = HierarchicalQueryExperienceParams(":::", + Map("suffix1" -> includeAggRule, "suffix2" -> excludeAggRule)) + // CASE 1 - should update - both lhs and rhs are satisfying the next level aggregation metric constraints + var query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) without (excludeTag1, excludeTag2)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.foreach( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith("_2").shouldEqual(true) + ) + // CASE 2 - should NOT update as excludeRule tags is not subset of rhs without clause labels + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) without (excludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + var updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1_2").shouldEqual(true) + updatedMetricNamesSet.contains("your_gauge:::suffix2").shouldEqual(true) // not updated + } + + it("LogicalPlan should not update when next level aggregation metric suffix is not matching agg rules") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val includeAggRule = IncludeAggRule("suffix1_2", Set("includeTag1", "includeTag2", "includeTag3")) + val excludeAggRule = ExcludeAggRule("suffix2_2", Set("excludeTag1", "excludeTag2")) + // Query with multiple agg rules and suffixes + val includeParams = HierarchicalQueryExperienceParams(":::", + Map("suffix1" -> includeAggRule, "suffix2" -> excludeAggRule)) + // CASE 1 - should not update - both lhs and rhs metric are not using suffix passed for lp update + var query = "sum(my_gauge:::no_rule{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + var filterGroups = getColumnFilterGroup(lpUpdated) + var updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true)// not updated + // CASE 2 - should NOT update rhs as it is not using the given suffix + query = "sum(my_gauge:::suffix1{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::no_rule2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (notExcludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::suffix1_2").shouldEqual(true) + updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated + // CASE 3 - should NOT update lhs as it is not using the given suffix + query = "sum(my_gauge:::no_rule{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (notExcludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true)// not updated + updatedMetricNamesSet.contains("your_gauge:::suffix2_2").shouldEqual(true) + // CASE 3 - should NOT update lhs and rhs as it is not using the given suffix + query = "sum(my_gauge:::no_rule{includeTag1=\"spark\", includeTag2=\"filodb\"}) by (includeTag3, includeTag1) + sum(your_gauge:::no_rule2{notExcludeTag1=\"spark\", notExcludeTag2=\"filodb\"}) by (notExcludeTag1)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(includeParams) + filterGroups = getColumnFilterGroup(lpUpdated) + updatedMetricNamesSet = filterGroups.flatten.filter(x => x.column == "__name__") + .map(_.filter.valuesStrings.head.asInstanceOf[String]).toSet + updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated + updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated + } } diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index 738692d024..2e6c561494 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -3,7 +3,7 @@ package filodb.query import filodb.core.GlobalConfig import filodb.core.query.{ColumnFilter, RangeParams, RvRange} import filodb.core.query.Filter.Equals -import filodb.query.util.{AggRule, HierarchicalQueryExperience} +import filodb.query.util.{HierarchicalQueryExperience, HierarchicalQueryExperienceParams} //scalastyle:off number.of.types //scalastyle:off file.size.limit @@ -42,7 +42,7 @@ sealed trait LogicalPlan { * @param params AggRule object - contains details of the higher level aggregation rule and metric * @return Updated LogicalPlan if Applicable. Else return the same LogicalPlan */ - def useHigherLevelAggregatedMetric(params: AggRule): LogicalPlan = { + def useHigherLevelAggregatedMetric(params: HierarchicalQueryExperienceParams): LogicalPlan = { // For now, only PeriodicSeriesPlan and RawSeriesLikePlan are optimized for higher level aggregation this match { // We start with no parent plans from the root @@ -74,7 +74,7 @@ sealed trait RawSeriesLikePlan extends LogicalPlan { * Seq(BinaryJoin, Aggregate, PeriodicSeriesWithWindowing) * @return Updated logical plan if optimized for higher level aggregation. Else return the same logical plan */ - def useAggregatedMetricIfApplicable(params: AggRule, + def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): RawSeriesLikePlan /** @@ -121,7 +121,7 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan { * Seq(BinaryJoin, Aggregate) * @return Updated logical plan if optimized for higher level aggregation. Else return the same logical plan */ - def useAggregatedMetricIfApplicable(params: AggRule, + def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan } @@ -180,7 +180,7 @@ case class RawSeries(rangeSelector: RangeSelector, * @param params AggRule object - contains details of the higher level aggregation rule and metric * @return Updated RawSeriesLikePlan if Applicable. Else return the same RawSeriesLikePlan */ - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): RawSeriesLikePlan = { // Example leaf periodic series plans which has access to raw series - PeriodicSeries, PeriodicSeriesWithWindowing, // ApplyInstantFunctionRaw. This can be configured as required. @@ -321,7 +321,7 @@ case class RawChunkMeta(rangeSelector: RangeSelector, this.copy(filters = updatedFilters) } - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // RawChunkMeta queries are not optimized for higher level aggregation this @@ -350,11 +350,10 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries = rawSeries.replaceRawSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // Check 1: Check if the parent logical plans are allowed for hierarchical aggregation update - HierarchicalQueryExperience.isParentPeriodicSeriesPlanAllowed( - parentLogicalPlans) match { + HierarchicalQueryExperience.isParentPeriodicSeriesPlanAllowed(parentLogicalPlans) match { case true => this.copy(rawSeries = rawSeries.useAggregatedMetricIfApplicable(params, parentLogicalPlans :+ this.getClass.getSimpleName)) @@ -424,7 +423,7 @@ case class SubqueryWithWindowing( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs) } - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // recurse to the leaf level this.copy(innerPeriodicSeries = innerPeriodicSeries.useAggregatedMetricIfApplicable( @@ -469,7 +468,7 @@ case class TopLevelSubquery( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries) } - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // recurse to the leaf level this.copy(innerPeriodicSeries = innerPeriodicSeries.useAggregatedMetricIfApplicable( @@ -505,7 +504,7 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan, series = series.replaceRawSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // Checks: // 1. Check if the range function is allowed @@ -569,15 +568,18 @@ case class Aggregate(operator: AggregationOperator, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { - HierarchicalQueryExperience.checkAggregateQueryEligibleForHigherLevelAggregatedMetric( - params, operator, clauseOpt) match { - case true => - this.copy(vectors = vectors.useAggregatedMetricIfApplicable( - params, parentLogicalPlans :+ this.getClass.getSimpleName)) - case false => - this + // Modify the map to retain all the AggRules which satisfies the current Aggregate clause labels. + val updatedMap = params.aggRules.filter(x => HierarchicalQueryExperience + .checkAggregateQueryEligibleForHigherLevelAggregatedMetric(x._2, operator, clauseOpt)) + if (updatedMap.isEmpty) { + // none of the aggregation rules matched with the + this + } else { + val updatedParams = params.copy(aggRules = updatedMap) + this.copy(vectors = vectors.useAggregatedMetricIfApplicable( + updatedParams, parentLogicalPlans :+ this.getClass.getSimpleName)) } } } @@ -611,7 +613,7 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs = lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // No special handling for BinaryJoin. Just pass the call to lhs and rhs recursively this.copy( @@ -639,7 +641,7 @@ case class ScalarVectorBinaryOperation(operator: BinaryOperator, this.copy(vector = vector.replacePeriodicSeriesFilters(filters), scalarArg = scalarArg.replacePeriodicSeriesFilters(filters).asInstanceOf[ScalarPlan]) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // No special handling for ScalarVectorBinaryOperation. Just pass the call to vector and and scalar plan recursively val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName @@ -668,7 +670,7 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan, vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { // No special handling for ApplyInstantFunction. Just pass the call to vectors and functionArgs recursively val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName @@ -693,7 +695,7 @@ case class ApplyInstantFunctionRaw(vectors: RawSeries, vectors = vectors.replaceRawSeriesFilters(newFilters).asInstanceOf[RawSeries], functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(newFilters).asInstanceOf[FunctionArgsPlan])) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): RawSeriesLikePlan = { val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName this.copy( @@ -721,7 +723,7 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { this.copy(vectors = vectors.useAggregatedMetricIfApplicable( params, parentLogicalPlans :+ this.getClass.getSimpleName)) @@ -740,7 +742,7 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { this.copy(vectors = vectors.useAggregatedMetricIfApplicable( params, parentLogicalPlans :+ this.getClass.getSimpleName)) @@ -776,7 +778,7 @@ final case class ScalarVaryingDoublePlan(vectors: PeriodicSeriesPlan, vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName this.copy( @@ -799,7 +801,7 @@ final case class ScalarTimeBasedPlan(function: ScalarFunctionId, rangeParams: Ra override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this // No Filter // No optimization for the scalar plan without raw or periodic series - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = this } @@ -817,7 +819,7 @@ final case class ScalarFixedDoublePlan(scalar: Double, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this // No optimization for the scalar plan without raw or periodic series - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = this } @@ -835,7 +837,7 @@ final case class VectorPlan(scalars: ScalarPlan) extends PeriodicSeriesPlan with override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(scalars = scalars.replacePeriodicSeriesFilters(filters).asInstanceOf[ScalarPlan]) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { this.copy( scalars = scalars.useAggregatedMetricIfApplicable( @@ -862,7 +864,7 @@ case class ScalarBinaryOperation(operator: BinaryOperator, this.copy(lhs = updatedLhs, rhs = updatedRhs) } - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName val updatedLhs = if (lhs.isRight) Right(lhs.right.get.useAggregatedMetricIfApplicable(params, @@ -889,7 +891,7 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { this.copy(vectors = vectors.useAggregatedMetricIfApplicable(params, parentLogicalPlans :+ this.getClass.getSimpleName)) @@ -911,7 +913,7 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def useAggregatedMetricIfApplicable(params: AggRule, + override def useAggregatedMetricIfApplicable(params: HierarchicalQueryExperienceParams, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { this.copy(vectors = vectors.useAggregatedMetricIfApplicable( params, parentLogicalPlans :+ this.getClass.getSimpleName)) diff --git a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala index c892560c78..5b089fa061 100644 --- a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala +++ b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala @@ -11,13 +11,19 @@ import filodb.query.{AggregateClause, AggregationOperator, LogicalPlan, TsCardin /** * Aggregation rule definition. Contains the following information: - * 1. aggregation metric regex to be matched - * 2. metric suffix for the given aggregation rule - * 3. include/exclude tags for the given aggregation rule + * 1. aggregation metricDelimiter to be matched + * 2. map of current aggregation metric suffix -> nextLevelAggregation's AggRule to be used + * For example: agg -> AggRule { metricSuffix = agg_2, tags = Set("tag1", "tag2") } */ -sealed trait AggRule { +case class HierarchicalQueryExperienceParams(metricDelimiter: String, + aggRules: Map[String, AggRule]) { } - val metricRegex: String +/** + * Aggregation rule definition. Contains the following information: + * 1. metric suffix for the given aggregation rule + * 2. include/exclude tags for the given aggregation rule + */ +sealed trait AggRule { val metricSuffix: String @@ -25,7 +31,11 @@ sealed trait AggRule { def isHigherLevelAggregationApplicable(shardKeyColumns: Set[String], filterTags: Seq[String]): Boolean } -case class IncludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[String]) extends AggRule { +/** + * @param metricSuffix - String - Metric suffix for the given aggregation rule + * @param tags - Set[String] - Include tags as specified in the aggregation rule + */ +case class IncludeAggRule(metricSuffix: String, tags: Set[String]) extends AggRule { /** * Checks if the higher level aggregation is applicable with IncludeTags. @@ -33,7 +43,6 @@ case class IncludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S * @param shardKeyColumns - Seq[String] - List of shard key columns. These columns are not part of check. This * include tags which are compulsory for the query like _metric_, _ws_, _ns_. * @param filterTags - Seq[String] - List of filter tags/labels in the query or in the aggregation clause - * @param includeTags - Set[String] - Include tags as specified in the aggregation rule * @return - Boolean */ override def isHigherLevelAggregationApplicable(shardKeyColumns: Set[String], filterTags: Seq[String]): Boolean = { @@ -41,7 +50,11 @@ case class IncludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S } } -case class ExcludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[String]) extends AggRule { +/** + * @param metricSuffix - String - Metric suffix for the given aggregation rule + * @param tags - Set[String] - Exclude tags as specified in the aggregation rule + */ +case class ExcludeAggRule(metricSuffix: String, tags: Set[String]) extends AggRule { /** * Checks if the higher level aggregation is applicable with ExcludeTags. Here we need to check if the column filter @@ -50,7 +63,6 @@ case class ExcludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S * @param shardKeyColumns - Seq[String] - List of shard key columns. These columns are not part of check. This * include tags which are compulsory for the query like _metric_, _ws_, _ns_. * @param filterTags - Seq[String] - List of filter tags/labels in the query or in the aggregation clause - * @param excludeTags - Set[String] - Exclude tags as specified in the aggregation rule * @return - Boolean */ override def isHigherLevelAggregationApplicable(shardKeyColumns: Set[String], filterTags: Seq[String]): Boolean = { @@ -60,38 +72,39 @@ case class ExcludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S object HierarchicalQueryExperience extends StrictLogging { - val hierarchicalQueryOptimizedCounter = Kamon.counter("hierarchical-query-plans-optimized") + private val hierarchicalQueryOptimizedCounter = Kamon.counter("hierarchical-query-plans-optimized") // Get the shard key columns from the dataset options along with all the metric labels used - lazy val shardKeyColumnsOption: Option[Set[String]] = GlobalConfig.datasetOptions match { + private lazy val shardKeyColumnsOption: Option[Set[String]] = GlobalConfig.datasetOptions match { case Some(datasetOptions) => Some((datasetOptions.shardKeyColumns ++ Seq( datasetOptions.metricColumn, GlobalConfig.PromMetricLabel)).toSet) case None => None } // Get the allowed aggregation operators from the hierarchical config - lazy val allowedAggregationOperators: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + private lazy val allowedAggregationOperators: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { case Some(hierarchicalConfig) => Some(hierarchicalConfig.getStringList("allowed-aggregation-operators").asScala.toSet) case None => None } // Get the allowed range functions from the hierarchical config - lazy val allowedRangeFunctions: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + private lazy val allowedRangeFunctions: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { case Some(hierarchicalConfig) => Some(hierarchicalConfig.getStringList("allowed-range-functions").asScala.toSet) case None => None } // Get the allowed periodic series plans which have access to RawSeries from the hierarchical config - lazy val allowedPeriodicSeriesPlansWithRawSeries: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + private lazy val allowedPeriodicSeriesPlansWithRawSeries: Option[Set[String]] = + GlobalConfig.hierarchicalConfig match { case Some(hierarchicalConfig) => Some(hierarchicalConfig.getStringList("allowed-periodic-series-plans-with-raw-series").asScala.toSet) case None => None } // Get the allowed parent logical plans for optimization from the hierarchical config - lazy val allowedLogicalPlansForOptimization: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + private lazy val allowedLogicalPlansForOptimization: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { case Some(hierarchicalConfig) => Some(hierarchicalConfig.getStringList("allowed-parent-logical-plans").asScala.toSet) case None => None @@ -136,9 +149,8 @@ object HierarchicalQueryExperience extends StrictLogging { } /** Checks if the higher level aggregation is applicable for the given Include/Exclude tags. - * @param isInclude - Boolean + * @param params - AggRule - Include or Exclude AggRule * @param filterTags - Seq[String] - List of filter tags/labels in the query or in the aggregation clause - * @param tags - Set[String] - Include or Exclude tags as specified in the aggregation rule * @return - Boolean */ def isHigherLevelAggregationApplicable(params: AggRule, @@ -153,24 +165,31 @@ object HierarchicalQueryExperience extends StrictLogging { } /** Returns the next level aggregated metric name. Example - * metricRegex = ::: + * metricDelimiter = ::: * metricSuffix = agg_2 * Existing metric name - metric1:::agg * After update - metric1:::agg -> metric1:::agg_2 + * @param metricName - String - Metric ColumnFilter tag/label + * @param metricDelimiter - HierarchicalQueryExperience - Contains + * @param metricSuffix - Seq[ColumnFilter] - label filters of the query/lp + * @return - Option[String] - Next level aggregated metric name + */ + def getNextLevelAggregatedMetricName(metricName : String, metricDelimiter: String, metricSuffix: String): String = { + metricName.replaceFirst(metricDelimiter + ".*", metricDelimiter + metricSuffix) + } + + /** Gets the current metric name from the given metricColumnFilter and filters + * * @param metricColumnFilter - String - Metric ColumnFilter tag/label - * @param params - HierarchicalQueryExperience - Contains - * @param filters - Seq[ColumnFilter] - label filters of the query/lp + * @param filters - Seq[ColumnFilter] - label filters of the query/lp * @return - Option[String] - Next level aggregated metric name */ - def getNextLevelAggregatedMetricName(metricColumnFilter: String, params: AggRule, - filters: Seq[ColumnFilter]): Option[String] = { + def getMetricName(metricColumnFilter: String, filters: Seq[ColumnFilter]): Option[String] = { // Get the metric name from the filters val metricNameSeq = LogicalPlan.getColumnValues(filters, metricColumnFilter) metricNameSeq match { case Seq() => None - case _ => Some(metricNameSeq.head.replaceFirst( - params.metricRegex + ".*", - params.metricRegex + params.metricSuffix)) + case _ => Some(metricNameSeq.head) } } @@ -219,31 +238,28 @@ object HierarchicalQueryExperience extends StrictLogging { /** * Updates the metric column filter if higher level aggregation is applicable - * - * @param isInclude - Boolean - Tells if the given tags are IncludeTags or ExcludeTags - * @param metricSuffix - String - Metric Suffix of the next aggregation level + * @param params - HierarchicalQueryExperienceParams - Contains metricDelimiter and aggRules * @param filters - Seq[ColumnFilter] - label filters of the query/lp - * @param tags - Include or Exclude tags as specified in the aggregation rule * @return - Seq[ColumnFilter] - Updated filters */ - def upsertMetricColumnFilterIfHigherLevelAggregationApplicable(params: AggRule, + def upsertMetricColumnFilterIfHigherLevelAggregationApplicable(params: HierarchicalQueryExperienceParams, filters: Seq[ColumnFilter]): Seq[ColumnFilter] = { val filterTags = filters.map(x => x.column) - if (isHigherLevelAggregationApplicable(params, filterTags)) { - val metricColumnFilter = getMetricColumnFilterTag(filterTags, GlobalConfig.datasetOptions.get.metricColumn) - val updatedMetricName = getNextLevelAggregatedMetricName(metricColumnFilter, params, filters) - updatedMetricName match { - case Some(metricName) => - // Checking if the metric actually ends with the next level aggregation metricSuffix. - // If so, update the filters and emit metric - // else, return the filters as is - metricName.endsWith(params.metricSuffix) match { - case true => - val updatedFilters = upsertFilters(filters, Seq(ColumnFilter(metricColumnFilter, Equals(metricName)))) - logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}") - incrementHierarcicalQueryOptimizedCounter(updatedFilters) - updatedFilters - case false => filters + val metricColumnFilter = getMetricColumnFilterTag(filterTags, GlobalConfig.datasetOptions.get.metricColumn) + val currentMetricName = getMetricName(metricColumnFilter, filters) + if (currentMetricName.isDefined) { + params.aggRules.find( x => currentMetricName.get.endsWith(x._1)) match { + case Some(aggRule) => + if (isHigherLevelAggregationApplicable(aggRule._2, filterTags)) { + val updatedMetricName = getNextLevelAggregatedMetricName( + currentMetricName.get, params.metricDelimiter, aggRule._2.metricSuffix) + val updatedFilters = upsertFilters( + filters, Seq(ColumnFilter(metricColumnFilter, Equals(updatedMetricName)))) + logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}") + incrementHierarchicalQueryOptimizedCounter(updatedFilters) + updatedFilters + } else { + filters } case None => filters } @@ -254,9 +270,9 @@ object HierarchicalQueryExperience extends StrictLogging { /** * Track the queries optimized by workspace and namespace - * @param filters + * @param filters - Seq[ColumnFilter] - label filters of the query/lp */ - private def incrementHierarcicalQueryOptimizedCounter(filters: Seq[ColumnFilter]): Unit = { + private def incrementHierarchicalQueryOptimizedCounter(filters: Seq[ColumnFilter]): Unit = { // track query optimized per workspace and namespace in the counter val metric_ws = LogicalPlan.getColumnValues(filters, TsCardinalities.LABEL_WORKSPACE) match { case Seq() => "" @@ -277,8 +293,9 @@ object HierarchicalQueryExperience extends StrictLogging { * Check 1: Check if the aggregation operator is enabled * Check 2: Check if the `by` and `without` clause labels satisfy the include/exclude tag constraints * - * @param isInclude true if includeTags are specified, false if excludeTags are specified - * @param tags higher aggregation rule tags/labels as defined in aggregation rule + * @param params higher level aggregation rule + * @param operator Aggregation operator like sum, avg, min, max, count + * @param clauseOpt AggregateClause - by or without clause * @return true if the current aggregate query can be optimized, false otherwise */ def checkAggregateQueryEligibleForHigherLevelAggregatedMetric(params: AggRule, @@ -295,7 +312,7 @@ object HierarchicalQueryExperience extends StrictLogging { true } else { - // can't be optimized further as by labels not present in the higher level metric's include tags + // can't be optimized further as by labels not present in the higher level metric include tags false } case AggregateClause.ClauseType.Without => @@ -308,10 +325,10 @@ object HierarchicalQueryExperience extends StrictLogging { // the includeTags. This requires the knowledge of all the tags/labels which are being published // for a metric. This info is not available during planning and hence we can't optimize this scenario. params match { - case IncludeAggRule(_, _, _) => + case IncludeAggRule( _, _) => // can't optimize this scenario as we don't have the required info at the planning stage false - case ExcludeAggRule(_, _, excludeTags) => + case ExcludeAggRule(_, excludeTags) => if (excludeTags.subsetOf(clause.labels.toSet)) { true } else { false } } diff --git a/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala b/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala index e8c6b2d100..72cb4bd36b 100644 --- a/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala +++ b/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala @@ -19,26 +19,15 @@ class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers { it("getNextLevelAggregatedMetricName should return expected metric name") { - val params = IncludeAggRule(":::", "agg_2", Set("job", "instance")) + val params = IncludeAggRule("agg_2", Set("job", "instance")) // Case 1: Should not update if metric doesn't have the aggregated metric identifier - HierarchicalQueryExperience.getNextLevelAggregatedMetricName("__name__", params, - Seq(ColumnFilter("__name__", Equals("metric1")), ColumnFilter("job", Equals("spark")))) shouldEqual Some("metric1") + HierarchicalQueryExperience.getNextLevelAggregatedMetricName( + "metric1", ":::", params.metricSuffix) shouldEqual "metric1" // Case 2: Should update if metric has the aggregated metric identifier - HierarchicalQueryExperience.getNextLevelAggregatedMetricName("__name__", params, - Seq(ColumnFilter("__name__", Equals("metric1:::agg")), ColumnFilter("job", Equals("spark")))) shouldEqual - Some("metric1:::agg_2") - - // Case 3: Should not update if metricColumnFilter and column filters don't match - HierarchicalQueryExperience.getNextLevelAggregatedMetricName("_metric_", params, - Seq(ColumnFilter("__name__", Equals("metric1:::agg")), ColumnFilter("job", Equals("spark")))) shouldEqual - None - - // Case 4: Similar to case 1 but with a different metric identifier - HierarchicalQueryExperience.getNextLevelAggregatedMetricName("_metric_", params, - Seq(ColumnFilter("_metric_", Equals("metric1:::agg")), ColumnFilter("job", Equals("spark")))) shouldEqual - Some("metric1:::agg_2") + HierarchicalQueryExperience.getNextLevelAggregatedMetricName( + "metric1:::agg", ":::", params.metricSuffix) shouldEqual "metric1:::agg_2" } it("isParentPeriodicSeriesPlanAllowedForRawSeriesUpdateForHigherLevelAggregatedMetric return expected values") { @@ -71,38 +60,39 @@ class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers { it("should check if higher level aggregation is applicable with IncludeTags") { HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - IncludeAggRule(":::", "agg_2", Set("tag1", "tag2")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true + IncludeAggRule("agg_2", Set("tag1", "tag2")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - IncludeAggRule(":::", "agg_2", Set("tag1", "tag2", "tag3")), Seq("tag1", "tag2", "_ws_", "_ns_", "__name__")) shouldEqual true + IncludeAggRule("agg_2", Set("tag1", "tag2", "tag3")), Seq("tag1", "tag2", "_ws_", "_ns_", "__name__")) shouldEqual true HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - IncludeAggRule(":::", "agg_2", Set("tag1", "tag2", "tag3")), Seq("tag3", "tag4", "_ws_", "_ns_", "__name__")) shouldEqual false + IncludeAggRule("agg_2", Set("tag1", "tag2", "tag3")), Seq("tag3", "tag4", "_ws_", "_ns_", "__name__")) shouldEqual false } it("should check if higher level aggregation is applicable with ExcludeTags") { HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - ExcludeAggRule(":::", "agg_2", Set("tag1", "tag2")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false + ExcludeAggRule("agg_2", Set("tag1", "tag2")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - ExcludeAggRule(":::", "agg_2", Set("tag1", "tag3")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false + ExcludeAggRule("agg_2", Set("tag1", "tag3")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - ExcludeAggRule(":::", "agg_2", Set("tag1", "tag2")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false + ExcludeAggRule("agg_2", Set("tag1", "tag2")),Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual false HierarchicalQueryExperience.isHigherLevelAggregationApplicable( - ExcludeAggRule(":::", "agg_2", Set("tag3", "tag4")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true + ExcludeAggRule("agg_2", Set("tag3", "tag4")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true } it("checkAggregateQueryEligibleForHigherLevelAggregatedMetric should increment counter if metric updated") { - val excludeParams = ExcludeAggRule(":::", "agg_2", Set("notAggTag1", "notAggTag2")) + val excludeRule = ExcludeAggRule("agg_2", Set("notAggTag1", "notAggTag2")) + val params = HierarchicalQueryExperienceParams(":::", Map("agg" -> excludeRule)) Kamon.init() var counter = Kamon.counter("hierarchical-query-plans-optimized") // CASE 1: Should update if metric have the aggregated metric identifier counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0 var updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable( - excludeParams, Seq( + params, Seq( ColumnFilter("__name__", Equals("metric1:::agg")), ColumnFilter("_ws_", Equals("testws")), ColumnFilter("_ns_", Equals("testns")), @@ -117,7 +107,7 @@ class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers { counter = Kamon.counter("hierarchical-query-plans-optimized") counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0 updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable( - excludeParams, Seq( + params, Seq( ColumnFilter("__name__", Equals("metric1:::agg")), ColumnFilter("_ws_", Equals("testws")), ColumnFilter("_ns_", Equals("testns")),