Skip to content

Commit

Permalink
misc(query): increment counter when query plan updated with next leve…
Browse files Browse the repository at this point in the history
…l aggregated metric (#1863)

* misc(query): increment counter when query plan updated with next level aggregated metric

* Adding unit test to test if metric is being incremented as expected
  • Loading branch information
sandeep6189 authored Oct 8, 2024
1 parent 22d0b08 commit 6d0e997
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object Dependencies {
val circeParser = "io.circe" %% "circe-parser" % "0.9.3"

lazy val commonDeps = Seq(
"io.kamon" %% "kamon-bundle" % kamonBundleVersion,
"io.kamon" %% "kamon-bundle" % kamonBundleVersion,
"io.kamon" %% "kamon-testkit" % kamonBundleVersion % Test,
logbackDep % Test,
scalaTest % Test,
"com.softwaremill.quicklens" %% "quicklens" % "1.4.12" % Test,
Expand Down
4 changes: 3 additions & 1 deletion query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.query

import filodb.core.GlobalConfig
import filodb.core.query.{ColumnFilter, RangeParams, RvRange}
import filodb.core.query.Filter.Equals
import filodb.query.util.{AggRule, HierarchicalQueryExperience}
Expand Down Expand Up @@ -225,7 +226,8 @@ case class SeriesKeysByFilters(filters: Seq[ColumnFilter],

object TsCardinalities {
val LABEL_WORKSPACE = "_ws_"
val SHARD_KEY_LABELS = Seq(LABEL_WORKSPACE, "_ns_", "__name__")
val LABEL_NAMESPACE = "_ns_"
val SHARD_KEY_LABELS = Seq(LABEL_WORKSPACE, LABEL_NAMESPACE, GlobalConfig.PromMetricLabel)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package filodb.query.util

import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import scala.jdk.CollectionConverters.asScalaBufferConverter

import filodb.core.GlobalConfig
import filodb.core.query.ColumnFilter
import filodb.core.query.Filter.Equals
import filodb.query.{AggregateClause, AggregationOperator, LogicalPlan}
import filodb.query.{AggregateClause, AggregationOperator, LogicalPlan, TsCardinalities}

/**
* Aggregation rule definition. Contains the following information:
Expand Down Expand Up @@ -59,6 +60,8 @@ case class ExcludeAggRule(metricRegex: String, metricSuffix: String, tags: Set[S

object HierarchicalQueryExperience extends StrictLogging {

val hierarchicalQueryOptimizedCounter = Kamon.counter("hierarchical-query-plans-optimized")

// Get the shard key columns from the dataset options along with all the metric labels used
lazy val shardKeyColumnsOption: Option[Set[String]] = GlobalConfig.datasetOptions match {
case Some(datasetOptions) =>
Expand Down Expand Up @@ -152,7 +155,7 @@ object HierarchicalQueryExperience extends StrictLogging {
/** Returns the next level aggregated metric name. Example
* metricRegex = :::
* metricSuffix = agg_2
* Exiting metric name - metric1:::agg
* Existing metric name - metric1:::agg
* After update - metric1:::agg -> metric1:::agg_2
* @param metricColumnFilter - String - Metric ColumnFilter tag/label
* @param params - HierarchicalQueryExperience - Contains
Expand Down Expand Up @@ -231,16 +234,44 @@ object HierarchicalQueryExperience extends StrictLogging {
val updatedMetricName = getNextLevelAggregatedMetricName(metricColumnFilter, params, filters)
updatedMetricName match {
case Some(metricName) =>
val updatedFilters = upsertFilters(filters, Seq(ColumnFilter(metricColumnFilter, Equals(metricName))))
logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}")
updatedFilters
// Checking if the metric actually ends with the next level aggregation metricSuffix.
// If so, update the filters and emit metric
// else, return the filters as is
metricName.endsWith(params.metricSuffix) match {
case true =>
val updatedFilters = upsertFilters(filters, Seq(ColumnFilter(metricColumnFilter, Equals(metricName))))
logger.info(s"[HierarchicalQueryExperience] Query optimized with filters: ${updatedFilters.toString()}")
incrementHierarcicalQueryOptimizedCounter(updatedFilters)
updatedFilters
case false => filters
}
case None => filters
}
} else {
filters
}
}

/**
* Track the queries optimized by workspace and namespace
* @param filters
*/
private def incrementHierarcicalQueryOptimizedCounter(filters: Seq[ColumnFilter]): Unit = {
// track query optimized per workspace and namespace in the counter
val metric_ws = LogicalPlan.getColumnValues(filters, TsCardinalities.LABEL_WORKSPACE) match {
case Seq() => ""
case ws => ws.head
}
val metric_ns = LogicalPlan.getColumnValues(filters, TsCardinalities.LABEL_NAMESPACE) match {
case Seq() => ""
case ns => ns.head
}
hierarchicalQueryOptimizedCounter
.withTag("metric_ws", metric_ws)
.withTag("metric_ns", metric_ns)
.increment()
}

/**
* Helper function to check the following:
* Check 1: Check if the aggregation operator is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package filodb.query.util

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import kamon.Kamon
import kamon.testkit.InstrumentInspection.Syntax.counterInstrumentInspection

import filodb.core.query.ColumnFilter
import filodb.core.query.Filter.Equals

class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers {

it("getMetricColumnFilterTag should return expected column") {
Expand Down Expand Up @@ -90,4 +93,40 @@ class HierarchicalQueryExperienceSpec extends AnyFunSpec with Matchers {
HierarchicalQueryExperience.isHigherLevelAggregationApplicable(
ExcludeAggRule(":::", "agg_2", Set("tag3", "tag4")), Seq("tag1", "tag2", "_ws_", "_ns_", "_metric_")) shouldEqual true
}

it("checkAggregateQueryEligibleForHigherLevelAggregatedMetric should increment counter if metric updated") {
val excludeParams = ExcludeAggRule(":::", "agg_2", Set("notAggTag1", "notAggTag2"))
Kamon.init()
var counter = Kamon.counter("hierarchical-query-plans-optimized")

// CASE 1: Should update if metric have the aggregated metric identifier
counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0
var updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable(
excludeParams, Seq(
ColumnFilter("__name__", Equals("metric1:::agg")),
ColumnFilter("_ws_", Equals("testws")),
ColumnFilter("_ns_", Equals("testns")),
ColumnFilter("aggTag", Equals("value"))))
updatedFilters.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("metric1:::agg_2")
counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 1


// CASE 2: Should not update if metric doesn't have the aggregated metric identifier
// reset the counter
counter = Kamon.counter("hierarchical-query-plans-optimized")
counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0
updatedFilters = HierarchicalQueryExperience.upsertMetricColumnFilterIfHigherLevelAggregationApplicable(
excludeParams, Seq(
ColumnFilter("__name__", Equals("metric1:::agg")),
ColumnFilter("_ws_", Equals("testws")),
ColumnFilter("_ns_", Equals("testns")),
ColumnFilter("notAggTag1", Equals("value")))) // using exclude tag, so should not optimize
updatedFilters.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("metric1:::agg")
// count should not increment
counter.withTag("metric_ws", "testws").withTag("metric_ns", "testns").value shouldEqual 0

Kamon.stop()
}
}

0 comments on commit 6d0e997

Please sign in to comment.