Skip to content

Commit

Permalink
fix(query): histogram queries with PeriodSeriesWithWindowing logical …
Browse files Browse the repository at this point in the history
…plan don't apply le label filter (#1715)

* Adding RangeVectorTransformer for le filter for PeriodicSeriesWithWindowing
  • Loading branch information
sandeep6189 authored Feb 16, 2024
1 parent aecfbeb commit 7aa80af
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,16 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}

// scalastyle:off method.length
override private[queryplanner] def materializePeriodicSeriesWithWindowing(qContext: QueryContext,
lp: PeriodicSeriesWithWindowing,
forceInProcess: Boolean): PlanResult = {

val logicalPlanWithoutBucket = if (queryConfig.translatePromToFilodbHistogram) {
removeBucket(Right(lp))._3.right.get
} else lp
val (nameFilter: Option[String], leFilter: Option[String], logicalPlanWithoutBucket: PeriodicSeriesWithWindowing) =
if (queryConfig.translatePromToFilodbHistogram) {
val result = removeBucket(Right(lp))
(result._1, result._2, result._3.right.get)
} else (None, None, lp)

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw
Expand All @@ -645,6 +648,18 @@ class SingleClusterPlanner(val dataset: Dataset,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))

// Add the le filter transformer to select the required bucket
(nameFilter, leFilter) match {
case (Some(filter), Some (le)) if filter.endsWith("_bucket") => {
val paramsExec = StaticFuncArgs(le.toDouble, RangeParams(realScanStartMs / 1000,
realScanStepMs / 1000, realScanEndMs / 1000))
series.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
case _ => //NOP
}

val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
Expand All @@ -662,9 +677,9 @@ class SingleClusterPlanner(val dataset: Dataset,
result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}

result
}
// scalastyle:on method.length

override private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]) = {
val rawSeries = lp match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {

reduceAggregateExec.children.foreach { l1 =>
l1.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual (0)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual (10000)
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}

Expand Down Expand Up @@ -220,11 +219,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {
1010000
l1.asInstanceOf[MultiSchemaPartitionsExec].chunkMethod.asInstanceOf[TimeRangeChunkScan].endTime shouldEqual
2000000
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual (1060000)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual (2000000)
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
val queryParams = child2.queryContext.origQueryParams.
asInstanceOf[PromQlQueryParams]
Expand Down Expand Up @@ -354,11 +352,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {
(1080000-lookBack)
l1.asInstanceOf[MultiSchemaPartitionsExec].chunkMethod.asInstanceOf[TimeRangeChunkScan].endTime shouldEqual
2000000
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual 1080000
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual 2000000
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}

val queryParams = child2.asInstanceOf[PromQlRemoteExec].queryContext.origQueryParams.
Expand Down Expand Up @@ -434,11 +431,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {

reduceAggregateExec.children.foreach { l1 =>
l1.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual from *1000
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual to * 1000
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,35 @@ import org.scalatest.matchers.should.Matchers
import filodb.query.LogicalPlan.getRawSeriesFilters
import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper


import scala.concurrent.duration._

object SingleClusterPlannerSpec {

def validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(lp: MultiSchemaPartitionsExec)
: Unit = {
// checking for NOT `_count` instead of _bucket, because we remove the the _bucket suffix for histogram queries
val metricName = lp.filters.find(x => x.column == lp.metricColumn).head.filter.valuesStrings.head.toString
val isBucketQuery = !(metricName.endsWith("_count") || metricName.endsWith("_total"))
// f1 is a bucket query with le filter. When le filter is applied, we add an additional rangeVectorTransformer
// to select the required bucket. This is not the case with histogram queries with _count, _sum, _min, _max suffix
// since these values are stored outside of histogram buckets and don't have le filters
if (isBucketQuery) {
lp.rangeVectorTransformers.size shouldEqual 3
lp.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
lp.rangeVectorTransformers(1).isInstanceOf[InstantVectorFunctionMapper] shouldEqual true
lp.rangeVectorTransformers(2).isInstanceOf[AggregateMapReduce] shouldEqual true
}
else {
lp.rangeVectorTransformers.size shouldEqual 2
lp.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
lp.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}
}

class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec {

implicit val system = ActorSystem()
Expand Down Expand Up @@ -118,9 +143,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand All @@ -140,9 +164,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l2.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l2.children.foreach { l3 =>
l3.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l3.rangeVectorTransformers.size shouldEqual 2
l3.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l3.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l3Exec = l3.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l3Exec)
}
}
}
Expand Down Expand Up @@ -1881,9 +1904,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand Down Expand Up @@ -2202,6 +2224,43 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
isInstanceOf[StaticFuncArgs] shouldEqual(true)
}

it("should add le label filter correctly for histogram query with PeriodicSeriesWithWindowing logical plan") {
val t = TimeStepParams(700, 1000, 10000)
val lp = Parser.queryRangeToLogicalPlan("""sum(rate(my_hist_bucket{job="prometheus",le="0.5"}[2m])) by (job)""", t)

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams))
val multiSchemaPartitionsExec = execPlan.children.head.asInstanceOf[MultiSchemaPartitionsExec]
// _bucket should be removed from name
multiSchemaPartitionsExec.filters.filter(_.column == "__name__").head.filter.valuesStrings.
head.equals("my_hist") shouldEqual true
// le filter should be removed
multiSchemaPartitionsExec.filters.filter(_.column == "le").isEmpty shouldEqual true
multiSchemaPartitionsExec.rangeVectorTransformers(1).isInstanceOf[InstantVectorFunctionMapper].
shouldEqual(true)
multiSchemaPartitionsExec.rangeVectorTransformers(1).asInstanceOf[InstantVectorFunctionMapper].funcParams.head.
isInstanceOf[StaticFuncArgs] shouldEqual (true)

multiSchemaPartitionsExec.rangeVectorTransformers(1).asInstanceOf[InstantVectorFunctionMapper].funcParams.head.
asInstanceOf[StaticFuncArgs].scalar shouldEqual 0.5

val expected =
"""T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(700,1000,10000))
|-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)
|--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
|---T~InstantVectorFunctionMapper(function=HistogramBucket)
|----FA1~StaticFuncArgs(0.5,RangeParams(700,1000,10000))
|----T~PeriodicSamplesMapper(start=700000, step=1000000, end=10000000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=8, chunkMethod=TimeRangeChunkScan(580000,10000000), filters=List(ColumnFilter(job,Equals(prometheus)), ColumnFilter(__name__,Equals(my_hist))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)
|--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
|---T~InstantVectorFunctionMapper(function=HistogramBucket)
|----FA1~StaticFuncArgs(0.5,RangeParams(700,1000,10000))
|----T~PeriodicSamplesMapper(start=700000, step=1000000, end=10000000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=24, chunkMethod=TimeRangeChunkScan(580000,10000000), filters=List(ColumnFilter(job,Equals(prometheus)), ColumnFilter(__name__,Equals(my_hist))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)"""
.stripMargin

validatePlan(execPlan, expected)
}

it("should NOT convert to histogram bucket query when _bucket is not a suffix") {
val t = TimeStepParams(700, 1000, 10000)
val lp = Parser.queryRangeToLogicalPlan("""my_bucket_counter{job="prometheus"}""", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import filodb.core.store.TimeRangeChunkScan
import filodb.prometheus.ast.{TimeStepParams, WindowConstants}
import filodb.prometheus.parse.Parser
import filodb.query._
import filodb.query.exec._
import filodb.query.exec.{MultiSchemaPartitionsExec, _}

class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaFutures {

Expand Down Expand Up @@ -139,9 +139,8 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand Down Expand Up @@ -169,9 +168,8 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l2.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l2.children.foreach { l3 =>
l3.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l3.rangeVectorTransformers.size shouldEqual 2
l3.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l3.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l3Exec = l3.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l3Exec)
}
}
}
Expand Down Expand Up @@ -372,13 +370,11 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}

}

it("should generate SplitExec wrapper with appropriate splits " +
Expand Down

0 comments on commit 7aa80af

Please sign in to comment.