Skip to content

Commit

Permalink
Adding unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Sep 3, 2024
1 parent dd76dcf commit e9098f8
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,26 +302,75 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers {
val nextLevelAggregationTags = Set("job", "application")
// CASE 1 - BinaryJoin (lhs = Aggregate, rhs = Aggregate) - Both lhs and rhs should be updated
val binaryJoinAggregationBothOptimization = "sum(metric1:::agg{job=\"app\"}) + sum(metric2:::agg{job=\"app\"})"
var lp1 = Parser.queryRangeToLogicalPlan(binaryJoinAggregationBothOptimization, t)
var lp1Updated = lp1.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
lp1Updated.isInstanceOf[BinaryJoin] shouldEqual true
lp1Updated.asInstanceOf[BinaryJoin].lhs.isInstanceOf[Aggregate] shouldEqual true
lp1Updated.asInstanceOf[BinaryJoin].rhs.isInstanceOf[Aggregate] shouldEqual true
var filterGroups = getColumnFilterGroup(lp1Updated)
var lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationBothOptimization, t)
var lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
lpUpdated.isInstanceOf[BinaryJoin] shouldEqual true
lpUpdated.asInstanceOf[BinaryJoin].lhs.isInstanceOf[Aggregate] shouldEqual true
lpUpdated.asInstanceOf[BinaryJoin].rhs.isInstanceOf[Aggregate] shouldEqual true
var filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true)
)
// CASE 2 - BinaryJoin (lhs = Aggregate, rhs = Aggregate) - rhs should be updated
val binaryJoinAggregationRHSOptimization = "sum(metric1:::agg{instance=\"abc\"}) + sum(metric2:::agg{job=\"app\"})"
lp1 = Parser.queryRangeToLogicalPlan(binaryJoinAggregationRHSOptimization, t)
lp1Updated = lp1.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
filterGroups = getColumnFilterGroup(lp1Updated.asInstanceOf[BinaryJoin].rhs)
lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationRHSOptimization, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true)
.shouldEqual("metric2:::agg_2")
)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("metric1:::agg")
)
// CASE 3 - BinaryJoin (lhs = Aggregate, rhs = Aggregate) - lhs should be updated and rhs should not since it is
// not an aggregated metric, even if both the metrics qualify for aggregation
val binaryJoinAggregationLHSOptimization = "sum(metric1:::agg{job=\"abc\"}) + sum(metric2{job=\"app\"})"
lp = Parser.queryRangeToLogicalPlan(binaryJoinAggregationLHSOptimization, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("metric2")
)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("metric1:::agg_2")
)
}


it("LogicalPlan update for hierarchical aggregation queries with by clause") {
// CASE 1 - should update since the by clause labels are part of include tags
val t = TimeStepParams(700, 1000, 10000)
val nextLevelAggregatedMetricSuffix = "agg_2"
val nextLevelAggregationTags = Set("job", "application", "instance", "version")
var query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)"
var lp = Parser.queryRangeToLogicalPlan(query, t)
var lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
var filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg_2")
)
filterGroups = getColumnFilterGroup(lp1Updated.asInstanceOf[BinaryJoin].lhs)
// CASE 2 - should NOT update since the by clause labels are not part of include tags
query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version, id)) by (version)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg")
)

query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application, mode) sum(my_counter{job=\"spark\", application=\"filodb\"}) by (job, application)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(true, nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)
filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(false)
Expand Down
22 changes: 18 additions & 4 deletions query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,23 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan,

override def useHigherLevelAggregatedMetricIfApplicable(isInclude: Boolean, metricSuffix: String,
tags: Set[String]): PeriodicSeriesPlan = {
this.copy(
lhs = lhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags),
rhs = rhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags)
)
on match {
case None => this.copy(
lhs = lhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags),
rhs = rhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags)
)
case Some(onLabels) => {
if (LogicalPlan.isHigherLevelAggregationApplicable(isInclude, onLabels, tags)) {
this.copy(
lhs = lhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags),
rhs = rhs.useHigherLevelAggregatedMetricIfApplicable(isInclude, metricSuffix, tags)
)
} else {
this
}
}
}
// TODO: add check for ignoring, include. Also, what is include and how are queries using it?
}
}

Expand Down Expand Up @@ -1107,6 +1120,7 @@ object LogicalPlan extends StrictLogging {
logger.info("Dataset options config not found. Skipping optimization !")
false
case Some(datasetOptions) =>
// TODO: Clarify if special treatment to be given to ws,ns column based on rules ?
val updatedShardKeyColumns = datasetOptions.shardKeyColumns :+ getMetricColumnFilterTag(
filterTags,
datasetOptions.metricColumn)
Expand Down

0 comments on commit e9098f8

Please sign in to comment.