From 6d0e99707213223ed1eb6a58b97cf07b20914bb9 Mon Sep 17 00:00:00 2001 From: sandeep6189 Date: Tue, 8 Oct 2024 09:27:57 -0700 Subject: [PATCH] misc(query): increment counter when query plan updated with next level aggregated metric (#1863) * misc(query): increment counter when query plan updated with next level aggregated metric * Adding unit test to test if metric is being incremented as expected --- project/Dependencies.scala | 3 +- .../main/scala/filodb/query/LogicalPlan.scala | 4 +- .../util/HierarchicalQueryExperience.scala | 41 ++++++++++++++++--- .../HierarchicalQueryExperienceSpec.scala | 39 ++++++++++++++++++ 4 files changed, 80 insertions(+), 7 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1a5cb24598..a1ee58e89f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,7 +40,8 @@ object Dependencies { val circeParser = "io.circe" %% "circe-parser" % "0.9.3" lazy val commonDeps = Seq( - "io.kamon" %% "kamon-bundle" % kamonBundleVersion, + "io.kamon" %% "kamon-bundle" % kamonBundleVersion, + "io.kamon" %% "kamon-testkit" % kamonBundleVersion % Test, logbackDep % Test, scalaTest % Test, "com.softwaremill.quicklens" %% "quicklens" % "1.4.12" % Test, diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index c6e88727c1..738692d024 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -1,5 +1,6 @@ package filodb.query +import filodb.core.GlobalConfig import filodb.core.query.{ColumnFilter, RangeParams, RvRange} import filodb.core.query.Filter.Equals import filodb.query.util.{AggRule, HierarchicalQueryExperience} @@ -225,7 +226,8 @@ case class SeriesKeysByFilters(filters: Seq[ColumnFilter], object TsCardinalities { val LABEL_WORKSPACE = "_ws_" - val SHARD_KEY_LABELS = Seq(LABEL_WORKSPACE, "_ns_", "__name__") + val LABEL_NAMESPACE = "_ns_" + val SHARD_KEY_LABELS = Seq(LABEL_WORKSPACE, LABEL_NAMESPACE, GlobalConfig.PromMetricLabel) } /** diff --git a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala index 975aded9bf..c892560c78 100644 --- a/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala +++ b/query/src/main/scala/filodb/query/util/HierarchicalQueryExperience.scala @@ -1,12 +1,13 @@ package filodb.query.util import com.typesafe.scalalogging.StrictLogging +import kamon.Kamon import scala.jdk.CollectionConverters.asScalaBufferConverter import filodb.core.GlobalConfig import filodb.core.query.ColumnFilter import filodb.core.query.Filter.Equals -import filodb.query.{AggregateClause, AggregationOperator, LogicalPlan} +import filodb.query.{AggregateClause, AggregationOperator, LogicalPlan, TsCardinalities} /** * Aggregation rule definition. Contains the following information: @@ -59,6 +60,8 @@ case class ExcludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S object HierarchicalQueryExperience extends StrictLogging { + val hierarchicalQueryOptimizedCounter = Kamon.counter("hierarchical-query-plans-optimized") + // Get the shard key columns from the dataset options along with all the metric labels used lazy val shardKeyColumnsOption: Option[Set[String]] = GlobalConfig.datasetOptions match { case Some(datasetOptions) => @@ -152,7 +155,7 @@ object HierarchicalQueryExperience extends StrictLogging { /** Returns the next level aggregated metric name. Example * metricRegex = ::: * metricSuffix = agg_2 - * Exiting metric name - metric1:::agg + * Existing metric name - metric1:::agg * After update - metric1:::agg -> metric1:::agg_2 * @param metricColumnFilter - String - Metric ColumnFilter tag/label * @param params - HierarchicalQueryExperience - Contains @@ -231,9 +234,17 @@ object HierarchicalQueryExperience extends StrictLogging { val updatedMetricName = getNextLevelAggregatedMetricName(metricColumnFilter, params, filters) updatedMetricName match { case Some(metricName) => - val updatedFilters = upsertFilters(filters, Seq(ColumnFilter(metricColumnFilter, Equals(metricName)))) - logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}") - updatedFilters + // Checking if the metric actually ends with the next level aggregation metricSuffix. + // If so, update the filters and emit metric + // else, return the filters as is + metricName.endsWith(params.metricSuffix) match { + case true => + val updatedFilters = upsertFilters(filters, Seq(ColumnFilter(metricColumnFilter, Equals(metricName)))) + logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}") + incrementHierarcicalQueryOptimizedCounter(updatedFilters) + updatedFilters + case false => filters + } case None => filters } } else { @@ -241,6 +252,26 @@ object HierarchicalQueryExperience extends StrictLogging { } } + /** + * Track the queries optimized by workspace and namespace + * @param filters + */ + private def incrementHierarcicalQueryOptimizedCounter(filters: Seq[ColumnFilter]): Unit = { + // track query optimized per workspace and namespace in the counter + val metric_ws = LogicalPlan.getColumnValues(filters, TsCardinalities.LABEL_WORKSPACE) match { + case Seq() => "" + case ws => ws.head + } + val metric_ns = LogicalPlan.getColumnValues(filters, TsCardinalities.LABEL_NAMESPACE) match { + case Seq() => "" + case ns => ns.head + } + hierarchicalQueryOptimizedCounter + .withTag("metric_ws", metric_ws) + .withTag("metric_ns", metric_ns) + .increment() + } + /** * Helper function to check the following: * Check 1: Check if the aggregation operator is enabled diff --git a/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala b/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala index 18cde7c053..e8c6b2d100 100644 --- a/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala +++ b/query/src/test/scala/filodb/query/util/HierarchicalQueryExperienceSpec.scala @@ -2,9 +2,12 @@ package filodb.query.util import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers +import kamon.Kamon +import kamon.testkit.InstrumentInspection.Syntax.counterInstrumentInspection import filodb.core.query.ColumnFilter import filodb.core.query.Filter.Equals + class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers { it("getMetricColumnFilterTag should return expected column") { @@ -90,4 +93,40 @@ class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers { HierarchicalQueryExperience.isHigherLevelAggregationApplicable( ExcludeAggRule(":::", "agg_2", Set("tag3", "tag4")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true } + + it("checkAggregateQueryEligibleForHigherLevelAggregatedMetric should increment counter if metric updated") { + val excludeParams = ExcludeAggRule(":::", "agg_2", Set("notAggTag1", "notAggTag2")) + Kamon.init() + var counter = Kamon.counter("hierarchical-query-plans-optimized") + + // CASE 1: Should update if metric have the aggregated metric identifier + counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0 + var updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable( + excludeParams, Seq( + ColumnFilter("__name__", Equals("metric1:::agg")), + ColumnFilter("_ws_", Equals("testws")), + ColumnFilter("_ns_", Equals("testns")), + ColumnFilter("aggTag", Equals("value")))) + updatedFilters.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("metric1:::agg_2") + counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 1 + + + // CASE 2: Should not update if metric doesn't have the aggregated metric identifier + // reset the counter + counter = Kamon.counter("hierarchical-query-plans-optimized") + counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0 + updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable( + excludeParams, Seq( + ColumnFilter("__name__", Equals("metric1:::agg")), + ColumnFilter("_ws_", Equals("testws")), + ColumnFilter("_ns_", Equals("testns")), + ColumnFilter("notAggTag1", Equals("value")))) // using exclude tag, so should not optimize + updatedFilters.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String] + .shouldEqual("metric1:::agg") + // count should not increment + counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0 + + Kamon.stop() + } }