diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index e255c6ad1b..86e0e1ded5 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -345,13 +345,12 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { ) } - it("LogicalPlan update for hierarchical aggregation queries with by clause") { + it("LogicalPlan update for hierarchical aggregation queries with by clause and include tags") { // common parameters val t = TimeStepParams(700, 1000, 10000) val nextLevelAggregatedMetricSuffix = "agg_2" val nextLevelAggregationTags = Set("job", "application", "instance", "version") val params = HierarchicalQueryExperience(true, ":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) - // CASE 1 - Aggregate with by clause - should update the metric name as `by` clause labels are part of include tags var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance)" var lp = Parser.queryRangeToLogicalPlan(query, t) @@ -361,7 +360,6 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] .shouldEqual("my_counter:::agg_2") ) - // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)" lp = Parser.queryRangeToLogicalPlan(query, t) @@ -389,8 +387,7 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] .endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true) ) - - // CASE 5 - lhs should not be updated since it is not a aggregated metric - binary join case + // CASE 5 - lhs should not be updated since it does not match regex pattern - binary join case query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (job, application)" lp = Parser.queryRangeToLogicalPlan(query, t) lpUpdated = lp.useHigherLevelAggregatedMetric(params) @@ -404,7 +401,6 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] .shouldEqual("my_counter:::agg_2") ) - // CASE 6 - rhs should not be updated since it has column filters which is not present in include tags query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\", id=\"1\"}) by (job, application)" lp = Parser.queryRangeToLogicalPlan(query, t) @@ -420,4 +416,236 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { .shouldEqual("my_counter:::agg") ) } + + it("LogicalPlan update for hierarchical aggregation queries with by clause and exclude tags") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val nextLevelAggregatedMetricSuffix = "agg_2" + val nextLevelAggregationExcludeTags = Set("instance", "version") + val params = HierarchicalQueryExperience(false, ":::", nextLevelAggregatedMetricSuffix, + nextLevelAggregationExcludeTags) + // CASE 1 - should update the metric name as `by` clause labels are not part of exclude tags + var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (host)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg_2") + ) + // CASE 2 - should NOT update the metric name as column filters are not part of exclude tags + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\", version=\"2.0\"}[5m]))" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now + query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"})) by (host)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 3 - should NOT update since the by clause labels intersect with exclude tags + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance, id)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 4 - should update since the by clause labels are not part of exclude tags - binary join case + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host) + sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true) + ) + // CASE 5 - lhs should not be updated since it does not match regex pattern - binary join case + query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (id, host) - sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("your_gauge:::agg_2") + ) + // CASE 6 - rhs should not be updated since it has column filters which are part of exclude tags + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (id, host) / sum(your_gauge:::agg{job=\"spark\", application=\"filodb\", version=\"1\"}) by (id, host)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge:::agg_2") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("your_gauge:::agg") + ) + } + + it ("LogicalPlan update for hierarchical aggregation queries with without clause and exclude tags") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val nextLevelAggregatedMetricSuffix = "agg_2" + val nextLevelAggregationExcludeTags = Set("instance", "version") + val params = HierarchicalQueryExperience(false, ":::", nextLevelAggregatedMetricSuffix, + nextLevelAggregationExcludeTags) + // CASE 1 - should update since the exclude tags are subset of the without clause labels + var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (version, instance)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg_2") + ) + // CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now + query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (instance, version))" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 3 - should NOT update since the column filter label is part of exclude tags + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\", version=\"2\"}[5m]))" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 4 - should update since the exclude tags are subset of the without clause labels + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (version, instance, id)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg_2") + ) + // CASE 5 - should not update since the exclude tags are not subset of the without clause labels + query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (version)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + // CASE 6 - should update since the exclude tags are subset of without clause labels - binary join case + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) and ignoring(version, instance) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true) + ) + // CASE 7 - lhs should not be updated since it does not match regex pattern - binary join case + query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) without (version, instance) and ignoring(version, instance) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg_2") + ) + // CASE 8 - rhs should not be updated since it has column filters which is part of exclude tags + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) and ignoring(version, instance) sum(my_counter:::agg{job=\"spark\", application=\"filodb\", version=\"1\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge:::agg_2") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + } + + it ("LogicalPlan update for hierarchical aggregation queries with without clause and include tags") { + // common parameters + val t = TimeStepParams(700, 1000, 10000) + val nextLevelAggregatedMetricSuffix = "agg_2" + val nextLevelAggregationTags = Set("job", "application", "instance", "version") + val params = HierarchicalQueryExperience(true, ":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags) + // All the cases should not be updated since without clause with include tags is not supported as of now + var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) without (version, instance)" + var lp = Parser.queryRangeToLogicalPlan(query, t) + var lpUpdated = lp.useHigherLevelAggregatedMetric(params) + var filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) without (instance, version))" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_counter:::agg") + ) + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) + sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .endsWith(":::agg").shouldEqual(true) + ) + query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) without (version, instance) - sum(your_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("your_gauge:::agg") + ) + query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) without (version, instance) / sum(your_gauge:::agg{job=\"spark\", application=\"filodb\", version=\"1\"}) without (version, instance)" + lp = Parser.queryRangeToLogicalPlan(query, t) + lpUpdated = lp.useHigherLevelAggregatedMetric(params) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("my_gauge:::agg") + ) + filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs) + filterGroups.map( + filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("your_gauge:::agg") + ) + } } diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index d13495d3e4..a6edebf4fe 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -1,7 +1,5 @@ package filodb.query -import com.typesafe.scalalogging.StrictLogging - import filodb.core.query.{ColumnFilter, RangeParams, RvRange} import filodb.core.query.Filter.Equals import filodb.query.util.HierarchicalQueryExperience @@ -38,6 +36,11 @@ sealed trait LogicalPlan { } } + /** + * Optimize the logical plan by using the higher level aggregated metric if applicable + * @param params HierarchicalQueryExperience object - contains details of the higher level aggregation rule and metric + * @return Updated LogicalPlan if Applicable. Else return the same LogicalPlan + */ def useHigherLevelAggregatedMetric(params: HierarchicalQueryExperience): LogicalPlan = { // For now, only PeriodicSeriesPlan and RawSeriesLikePlan are optimized for higher level aggregation this match { @@ -60,6 +63,9 @@ sealed trait RawSeriesLikePlan extends LogicalPlan { def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience): RawSeriesLikePlan + /** + * @return Raw series column filters + */ def rawSeriesFilters(): Seq[ColumnFilter] } @@ -88,11 +94,6 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan { def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan - /** - * @param isInclude - * @param tags - * @return - */ def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan } @@ -147,6 +148,11 @@ case class RawSeries(rangeSelector: RangeSelector, this.copy(filters = updatedFilters) } + /** + * Updates the metric ColumnFilter if the higher level aggregation rule is applicable + * @param params HierarchicalQueryExperience object - contains details of the higher level aggregation rule and metric + * @return Updated RawSeriesLikePlan if Applicable. Else return the same RawSeriesLikePlan + */ override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience): RawSeriesLikePlan = { val updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable( params, filters) @@ -303,10 +309,12 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan, override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { + // Check 1: Check if this plan is allowed for raw series update HierarchicalQueryExperience.isPeriodicSeriesPlanAllowedForRawSeriesUpdateForHigherLevelAggregatedMetric( this.getClass.getSimpleName) && + // Check 2: Check if the parent logical plans are allowed for hierarchical aggregation update HierarchicalQueryExperience.isParentPeriodicSeriesPlanAllowedForRawSeriesUpdateForHigherLevelAggregatedMetric( - parentLogicalPlans) match { + parentLogicalPlans) match { case true => this.copy(rawSeries = rawSeries.useHigherLevelAggregatedMetricIfApplicable(params)) case false => this @@ -377,6 +385,7 @@ case class SubqueryWithWindowing( override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { + // recurse to the leaf level this.copy(innerPeriodicSeries = innerPeriodicSeries.useHigherLevelAggregatedMetricIfApplicable( params, parentLogicalPlans :+ this.getClass.getSimpleName)) } @@ -421,6 +430,7 @@ case class TopLevelSubquery( override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { + // recurse to the leaf level this.copy(innerPeriodicSeries = innerPeriodicSeries.useHigherLevelAggregatedMetricIfApplicable( params, parentLogicalPlans :+ this.getClass.getSimpleName)) } @@ -520,10 +530,16 @@ case class Aggregate(operator: AggregationOperator, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - // Check if the higher level aggregation metric is applicable for this Aggregate plan + /** + * Helper function to check the following: + * Check 1: Check if the aggregation operator is enabled + * Check 2: Check if the `by` and `without` clause labels satisfy the include/exclude tag constraints + * + * @param isInclude true if includeTags are specified, false if excludeTags are specified + * @param tags higher aggregation rule tags/labels as defined in aggregation rule + * @return true if the current aggregate query can be optimized, false otherwise + */ def checkAggregateQueryEligibleForHigherLevelAggregatedMetric(isInclude: Boolean, tags: Set[String]): Boolean = { - // Check 1: Check if the aggregation operator is enabled - // Check 2: Check if the `by` and `without` clause labels satisfy the include/exclude tag constraints HierarchicalQueryExperience.isAggregationOperatorAllowed(operator.entryName) match { case true => clauseOpt match { @@ -635,6 +651,7 @@ case class ScalarVectorBinaryOperation(operator: BinaryOperator, override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { + // No special handling for ScalarVectorBinaryOperation. Just pass the call to vector and and scalar plan recursively val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName this.copy( vector = vector.useHigherLevelAggregatedMetricIfApplicable( @@ -663,6 +680,7 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan, override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { + // No special handling for ApplyInstantFunction. Just pass the call to vectors and functionArgs recursively val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName this.copy( vectors = vectors.useHigherLevelAggregatedMetricIfApplicable( @@ -686,7 +704,6 @@ case class ApplyInstantFunctionRaw(vectors: RawSeries, functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(newFilters).asInstanceOf[FunctionArgsPlan])) override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience): RawSeriesLikePlan = { - // TODO: use raw series this.copy( vectors = vectors.useHigherLevelAggregatedMetricIfApplicable( params).asInstanceOf[RawSeries]) @@ -850,7 +867,6 @@ case class ScalarBinaryOperation(operator: BinaryOperator, this.copy(lhs = updatedLhs, rhs = updatedRhs) } - override def useHigherLevelAggregatedMetricIfApplicable(params: HierarchicalQueryExperience, parentLogicalPlans: Seq[String]): PeriodicSeriesPlan = { val parentLogicalPlansUpdated = parentLogicalPlans :+ this.getClass.getSimpleName @@ -907,7 +923,7 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan, } } -object LogicalPlan extends StrictLogging { +object LogicalPlan { /** * Get leaf Logical Plans */ @@ -945,7 +961,6 @@ object LogicalPlan extends StrictLogging { } } - def getColumnValues(logicalPlan: LogicalPlan, labelName: String): Set[String] = { getColumnValues(getColumnFilterGroup(logicalPlan), labelName) } diff --git a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala index a205e32096..40fe31ed73 100644 --- a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala +++ b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala @@ -35,6 +35,20 @@ object HierarchicalQueryExperience extends StrictLogging { case None => None } + // Get the allowed periodic series plans which have access to RawSeries from the hierarchical config + lazy val allowedPeriodicSeriesPlansWithRawSeries: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + case Some(hierarchicalConfig) => + Some(hierarchicalConfig.getStringList("allowed-periodic-series-plans-with-raw-series").asScala.toSet) + case None => None + } + + // Get the allowed parent logical plans for optimization from the hierarchical config + lazy val allowedLogicalPlansForOptimization: Option[Set[String]] = GlobalConfig.hierarchicalConfig match { + case Some(hierarchicalConfig) => + Some(hierarchicalConfig.getStringList("allowed-parent-logical-plans").asScala.toSet) + case None => None + } + /** * Helper function to get the ColumnFilter tag/label for the metric. This is needed to correctly update the filter. * @param filterTags - Seq[String] - List of ColumnFilter tags/labels @@ -175,13 +189,9 @@ object HierarchicalQueryExperience extends StrictLogging { * @return - Boolean */ def isParentPeriodicSeriesPlanAllowedForRawSeriesUpdateForHigherLevelAggregatedMetric( - parentLogicalPlans: Seq[String]): Boolean = { - GlobalConfig.hierarchicalConfig match { - case Some(hierarchicalConfig) => - parentLogicalPlans.exists(hierarchicalConfig - .getStringList("allowed-parent-logical-plans").contains) - case None => false - } + parentLogicalPlans: Seq[String]): Boolean = allowedLogicalPlansForOptimization match { + case Some(allowedLogicalPlans) => parentLogicalPlans.exists(allowedLogicalPlans.contains) + case None => false } /** @@ -190,14 +200,9 @@ object HierarchicalQueryExperience extends StrictLogging { * @return - Boolean */ def isPeriodicSeriesPlanAllowedForRawSeriesUpdateForHigherLevelAggregatedMetric( - logicalPlanName: String): Boolean = { - GlobalConfig.hierarchicalConfig match { - case Some(hierarchicalConfig) => - hierarchicalConfig - .getStringList("allowed-periodic-series-plans-with-raw-series") - .contains(logicalPlanName) + logicalPlanName: String): Boolean = allowedPeriodicSeriesPlansWithRawSeries match { + case Some(allowedPeriodSeriesPlans) => allowedPeriodSeriesPlans.contains(logicalPlanName) case None => false - } } /**