Skip to content

Commit

Permalink
fix @modifier for multiple partition time series. (#1723)
Browse files Browse the repository at this point in the history
* fix @modifier for multiple partition time series.
---------

Co-authored-by: Yu Zhang <[email protected]>
  • Loading branch information
yu-shipit and Yu Zhang authored Feb 29, 2024
1 parent e2f68d4 commit 3450204
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object QueryConstants {
val ClosingCurlyBraces = "}"
val Space = " "
val Offset = "offset"
val At = "@"
val OpeningRoundBracket = "("
val ClosingRoundBracket = ")"
val OpeningSquareBracket = "["
Expand Down Expand Up @@ -64,7 +65,8 @@ object LogicalPlanParser {

private def periodicSeriesToQuery(periodicSeries: PeriodicSeries): String = {
// Queries like sum(foo) should not have window even though stale lookback is present
s"${rawSeriesLikeToQuery(periodicSeries.rawSeries, false)}"
val at = periodicSeries.atMs.fold("")(atMs => s" $At${atMs / 1000}")
s"${rawSeriesLikeToQuery(periodicSeries.rawSeries, false)}$at"
}

private def aggregateToQuery(lp: Aggregate): String = {
Expand Down Expand Up @@ -129,12 +131,13 @@ object LogicalPlanParser {
val addWindow = if (lp.function.equals(Timestamp)) false else true
val rawSeries = rawSeriesLikeToQuery(lp.series, addWindow)
val prefix = lp.function.entryName + OpeningRoundBracket
if (lp.functionArgs.isEmpty) s"$prefix$rawSeries$ClosingRoundBracket"
val at = lp.atMs.fold("")(atMs => s" $At${atMs / 1000}")
if (lp.functionArgs.isEmpty) s"$prefix$rawSeries$at$ClosingRoundBracket"
else {
if (lp.function.equals(QuantileOverTime))
s"$prefix${functionArgsToQuery(lp.functionArgs.head)}$Comma$rawSeries$ClosingRoundBracket"
else
s"$prefix$rawSeries$Comma${lp.functionArgs.map(functionArgsToQuery(_)).mkString(Comma)}" +
s"$prefix$rawSeries$at$Comma${lp.functionArgs.map(functionArgsToQuery(_)).mkString(Comma)}" +
s"$ClosingRoundBracket"
}
}
Expand Down Expand Up @@ -183,7 +186,8 @@ object LogicalPlanParser {
val sqClause =
s"${OpeningSquareBracket}${sqww.subqueryWindowMs/1000}s:${sqww.subqueryStepMs/1000}s$ClosingSquareBracket"
val offset = sqww.offsetMs.fold("")(offsetMs => s" offset ${offsetMs/1000}s")
val suffix = s"$sqClause$offset$ClosingRoundBracket"
val at = sqww.atMs.fold("")(atMs => s" $At${atMs / 1000}")
val suffix = s"$sqClause$offset$at$ClosingRoundBracket"
if (sqww.functionArgs.isEmpty) s"$prefix$periodicSeriesQuery$suffix"
else {
s"$prefix${functionArgsToQuery(sqww.functionArgs.head)}$Comma$periodicSeriesQuery$suffix"
Expand All @@ -193,9 +197,10 @@ object LogicalPlanParser {
private def topLevelSubqueryToQuery(tlsq: TopLevelSubquery): String = {
val periodicSeriesQuery = convertToQuery(tlsq.innerPeriodicSeries)
val offset = tlsq.originalOffsetMs.fold("")(offsetMs => s" offset ${offsetMs/1000}s")
val at = tlsq.atMs.fold("")(atMs => s" $At${atMs / 1000}")
val sqClause =
s"${OpeningSquareBracket}${(tlsq.orginalLookbackMs)/1000}s:${tlsq.stepMs/1000}s$ClosingSquareBracket"
s"${periodicSeriesQuery}${sqClause}${offset}"
s"$periodicSeriesQuery$sqClause$offset$at"
}

def metadataMatchToQuery(lp: MetadataQueryPlan): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package filodb.coordinator.queryplanner

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.SeriesKeysByFilters
import filodb.query.{IntervalSelector, RawSeries, SeriesKeysByFilters}

class LogicalPlanParserSpec extends AnyFunSpec with Matchers {

Expand Down Expand Up @@ -80,6 +79,150 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers {
res shouldEqual("http_requests_total{job=\"app\"} offset 300s")
}

it("should generate query from LogicalPlan having @modifier") {
var query = "http_requests_total{job=\"app\"} offset 5m @start()"
var lp = Parser.queryToLogicalPlan(query, 1000, 1000)
var res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "http_requests_total{job=\"app\"} offset 300s @1000"

query = "topk(1, http_requests_total{job=\"app\"}offset 5m @start())"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "topk(1.0,http_requests_total{job=\"app\"} offset 300s @1000)"

query = "http_requests_total{job=\"app\"} and topk(1, http_requests_total{job=\"app\"}offset 5m @start())"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"(http_requests_total{job=\"app\"} and topk(1.0,http_requests_total{job=\"app\"} offset 300s @1000))"

query = "scalar(topk(1, http_requests_total{job=\"app\"}offset 5m @start()))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"scalar(topk(1.0,http_requests_total{job=\"app\"} offset 300s @1000))"

query = "ln(topk(1, http_requests_total{job=\"app\"}offset 5m @start()))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"ln(topk(1.0,http_requests_total{job=\"app\"} offset 300s @1000))"

query = "rate(http_requests_total{job=\"app\"}[5m] offset 5m @1000)"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "rate(http_requests_total{job=\"app\"}[300s] offset 300s @1000)"

query = "sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @1000))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @1000))"

query = "topk(2, sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @start())))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "topk(2.0,sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @1000)))"

query = "sum(rate(http_requests_total{job=\"app\"}[5m])) and " +
"topk(2, sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @start())))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "(sum(rate(http_requests_total{job=\"app\"}[300s]))" +
" and topk(2.0,sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @1000))))"

query = "sum(rate(http_requests_total{job=\"app\"}[5m: 1m] @100))"
lp = Parser.queryToLogicalPlan(query, 1000, 1000)
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "sum(rate(http_requests_total{job=\"app\"}[300s:60s] @100))"
}

it("should generate range query from LogicalPlan having @modifier") {
var query = "http_requests_total{job=\"app\"} offset 5m @start()"
var lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
var res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "http_requests_total{job=\"app\"} offset 300s @2000"

query = "topk(1, http_requests_total{job=\"app\"}offset 5m @start())"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "topk(1.0,http_requests_total{job=\"app\"} offset 300s @2000)"

query = "http_requests_total{job=\"app\"} and topk(1, http_requests_total{job=\"app\"}offset 5m @start())"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"(http_requests_total{job=\"app\"} and topk(1.0,http_requests_total{job=\"app\"} offset 300s @2000))"

query = "scalar(topk(1, http_requests_total{job=\"app\"}offset 5m @start()))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"scalar(topk(1.0,http_requests_total{job=\"app\"} offset 300s @2000))"

query = "ln(topk(1, http_requests_total{job=\"app\"}offset 5m @start()))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual
"ln(topk(1.0,http_requests_total{job=\"app\"} offset 300s @2000))"

query = "rate(http_requests_total{job=\"app\"}[5m] offset 5m @2000)"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "rate(http_requests_total{job=\"app\"}[300s] offset 300s @2000)"

query = "sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @2000))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @2000))"

query = "topk(2, sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @end())))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "topk(2.0,sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @5000)))"

query = "sum(rate(http_requests_total{job=\"app\"}[5m])) and " +
"topk(2, sum(rate(http_requests_total{job=\"app\"}[5m] offset 5m @start())))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "(sum(rate(http_requests_total{job=\"app\"}[300s]))" +
" and topk(2.0,sum(rate(http_requests_total{job=\"app\"}[300s] offset 300s @2000))))"

query = "sum(rate(http_requests_total{job=\"app\"}[5m: 1m] @100))"
lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, 10, 5000))
res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "sum(rate(http_requests_total{job=\"app\"}[300s:60s] @100))"
}

it("do not need @modifier time range is changed.") {
val query = "http_requests_total{job=\"app\"}[5m] offset 5m @500"
val lp = Parser.queryToLogicalPlan(query, 10000, 1000)
// the selector time is changed to [500s, 500s]
lp.asInstanceOf[RawSeries].rangeSelector shouldEqual IntervalSelector(500000, 500000)
val res = LogicalPlanParser.convertToQuery(lp)
// Converted query has time in seconds
res shouldEqual "http_requests_total{job=\"app\"}[300s] offset 300s"
}

it("should generate query from SubqueryWithWindowing having offset") {
val query = """sum_over_time(http_requests_total{job="app"}[5m:1m] offset 5m)"""
val lp = Parser.queryToLogicalPlan(query, 1000, 1000, Parser.Antlr)
Expand Down

0 comments on commit 3450204

Please sign in to comment.