Skip to content

Commit

Permalink
Adding by clause unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Sep 11, 2024
1 parent c267820 commit 078c66f
Showing 1 changed file with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,38 +345,79 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers {
)
}


it("LogicalPlan update for hierarchical aggregation queries with by clause") {
// CASE 1 - should update since the by clause labels are part of include tags
// common parameters
val t = TimeStepParams(700, 1000, 10000)
val nextLevelAggregatedMetricSuffix = "agg_2"
val nextLevelAggregationTags = Set("job", "application", "instance", "version")
var query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)"
var lp = Parser.queryRangeToLogicalPlan(query, t)
val params = HierarchicalQueryExperience(true, ":::", nextLevelAggregatedMetricSuffix, nextLevelAggregationTags)

// CASE 1 - Aggregate with by clause - should update the metric name as `by` clause labels are part of include tags
var query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance)"
var lp = Parser.queryRangeToLogicalPlan(query, t)
var lpUpdated = lp.useHigherLevelAggregatedMetric(params)
var filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg_2")
)
// CASE 2 - should NOT update since the by clause labels are not part of include tags
query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version, id)) by (version)"

// CASE 2 - should NOT update since bottomk aggregation operator is not allowed as of now
query = "sum(bottomk(2, my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (instance, version)) by (version)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(params)
filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter( x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg")
)
// CASE 3 - should NOT update since the by clause labels are not part of include tags
query = "sum(rate(my_counter:::agg{job=\"spark\", application=\"app\"}[5m])) by (version, instance, id)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(params)
filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg")
)

query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application, mode) sum(my_counter{job=\"spark\", application=\"filodb\"}) by (job, application)"
// CASE 4 - should update since the by clause labels are part of include tags - binary join case
query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (job, application)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(params)
filterGroups = getColumnFilterGroup(lpUpdated)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(false)
.endsWith(nextLevelAggregatedMetricSuffix).shouldEqual(true)
)

// CASE 5 - lhs should not be updated since it is not a aggregated metric - binary join case
query = "sum(my_gauge{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\"}) by (job, application)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(params)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_gauge")
)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg_2")
)

// CASE 6 - rhs should not be updated since it has column filters which is not present in include tags
query = "sum(my_gauge:::agg{job=\"spark\", application=\"filodb\"}) by (job, application) and on(job, application) sum(my_counter:::agg{job=\"spark\", application=\"filodb\", id=\"1\"}) by (job, application)"
lp = Parser.queryRangeToLogicalPlan(query, t)
lpUpdated = lp.useHigherLevelAggregatedMetric(params)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].lhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_gauge:::agg_2")
)
filterGroups = getColumnFilterGroup(lpUpdated.asInstanceOf[BinaryJoin].rhs)
filterGroups.map(
filterSet => filterSet.filter(x => x.column == "__name__").head.filter.valuesStrings.head.asInstanceOf[String]
.shouldEqual("my_counter:::agg")
)
}
}

0 comments on commit 078c66f

Please sign in to comment.