Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): histogram queries with PeriodSeriesWithWindowing logical plan don't apply le label filter #1715

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,16 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}

// scalastyle:off method.length
override private[queryplanner] def materializePeriodicSeriesWithWindowing(qContext: QueryContext,
lp: PeriodicSeriesWithWindowing,
forceInProcess: Boolean): PlanResult = {

val logicalPlanWithoutBucket = if (queryConfig.translatePromToFilodbHistogram) {
removeBucket(Right(lp))._3.right.get
} else lp
val (nameFilter: Option[String], leFilter: Option[String], logicalPlanWithoutBucket: PeriodicSeriesWithWindowing) =
if (queryConfig.translatePromToFilodbHistogram) {
val result = removeBucket(Right(lp))
(result._1, result._2, result._3.right.get)
} else (None, None, lp)

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw
Expand All @@ -645,6 +648,18 @@ class SingleClusterPlanner(val dataset: Dataset,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))

// Add the le filter transformer to select the required bucket
(nameFilter, leFilter) match {
case (Some(filter), Some (le)) if filter.endsWith("_bucket") => {
val paramsExec = StaticFuncArgs(le.toDouble, RangeParams(realScanStartMs / 1000,
realScanStepMs / 1000, realScanEndMs / 1000))
series.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
case _ => //NOP
}

val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
Expand All @@ -662,9 +677,9 @@ class SingleClusterPlanner(val dataset: Dataset,
result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}

result
}
// scalastyle:on method.length

override private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]) = {
val rawSeries = lp match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {

reduceAggregateExec.children.foreach { l1 =>
l1.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual (0)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual (10000)
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}

Expand Down Expand Up @@ -220,11 +219,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {
1010000
l1.asInstanceOf[MultiSchemaPartitionsExec].chunkMethod.asInstanceOf[TimeRangeChunkScan].endTime shouldEqual
2000000
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual (1060000)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual (2000000)
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
val queryParams = child2.queryContext.origQueryParams.
asInstanceOf[PromQlQueryParams]
Expand Down Expand Up @@ -354,11 +352,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {
(1080000-lookBack)
l1.asInstanceOf[MultiSchemaPartitionsExec].chunkMethod.asInstanceOf[TimeRangeChunkScan].endTime shouldEqual
2000000
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual 1080000
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual 2000000
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}

val queryParams = child2.asInstanceOf[PromQlRemoteExec].queryContext.origQueryParams.
Expand Down Expand Up @@ -434,11 +431,10 @@ class HighAvailabilityPlannerSpec extends AnyFunSpec with Matchers {

reduceAggregateExec.children.foreach { l1 =>
l1.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l1.rangeVectorTransformers.size shouldEqual 2
l1.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
val l1Exec = l1.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l1Exec)
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].startMs shouldEqual from *1000
l1.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper].endMs shouldEqual to * 1000
l1.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,35 @@ import org.scalatest.matchers.should.Matchers
import filodb.query.LogicalPlan.getRawSeriesFilters
import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper


import scala.concurrent.duration._

object SingleClusterPlannerSpec {

def validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(lp: MultiSchemaPartitionsExec)
: Unit = {
// checking for NOT `_count` instead of _bucket, because we remove the the _bucket suffix for histogram queries
val metricName = lp.filters.find(x => x.column == lp.metricColumn).head.filter.valuesStrings.head.toString
val isBucketQuery = !(metricName.endsWith("_count") || metricName.endsWith("_total"))
// f1 is a bucket query with le filter. When le filter is applied, we add an additional rangeVectorTransformer
// to select the required bucket. This is not the case with histogram queries with _count, _sum, _min, _max suffix
// since these values are stored outside of histogram buckets and don't have le filters
if (isBucketQuery) {
lp.rangeVectorTransformers.size shouldEqual 3
lp.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
lp.rangeVectorTransformers(1).isInstanceOf[InstantVectorFunctionMapper] shouldEqual true
lp.rangeVectorTransformers(2).isInstanceOf[AggregateMapReduce] shouldEqual true
}
else {
lp.rangeVectorTransformers.size shouldEqual 2
lp.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
lp.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
}
}
}

class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec {

implicit val system = ActorSystem()
Expand Down Expand Up @@ -118,9 +143,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand All @@ -140,9 +164,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l2.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l2.children.foreach { l3 =>
l3.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l3.rangeVectorTransformers.size shouldEqual 2
l3.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l3.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l3Exec = l3.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l3Exec)
}
}
}
Expand Down Expand Up @@ -1881,9 +1904,8 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand Down Expand Up @@ -2202,6 +2224,43 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
isInstanceOf[StaticFuncArgs] shouldEqual(true)
}

it("should add le label filter correctly for histogram query with PeriodicSeriesWithWindowing logical plan") {
sandeep6189 marked this conversation as resolved.
Show resolved Hide resolved
val t = TimeStepParams(700, 1000, 10000)
val lp = Parser.queryRangeToLogicalPlan("""sum(rate(my_hist_bucket{job="prometheus",le="0.5"}[2m])) by (job)""", t)

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams))
val multiSchemaPartitionsExec = execPlan.children.head.asInstanceOf[MultiSchemaPartitionsExec]
// _bucket should be removed from name
multiSchemaPartitionsExec.filters.filter(_.column == "__name__").head.filter.valuesStrings.
head.equals("my_hist") shouldEqual true
// le filter should be removed
multiSchemaPartitionsExec.filters.filter(_.column == "le").isEmpty shouldEqual true
multiSchemaPartitionsExec.rangeVectorTransformers(1).isInstanceOf[InstantVectorFunctionMapper].
shouldEqual(true)
multiSchemaPartitionsExec.rangeVectorTransformers(1).asInstanceOf[InstantVectorFunctionMapper].funcParams.head.
isInstanceOf[StaticFuncArgs] shouldEqual (true)

multiSchemaPartitionsExec.rangeVectorTransformers(1).asInstanceOf[InstantVectorFunctionMapper].funcParams.head.
asInstanceOf[StaticFuncArgs].scalar shouldEqual 0.5

val expected =
"""T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(700,1000,10000))
|-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)
|--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
|---T~InstantVectorFunctionMapper(function=HistogramBucket)
|----FA1~StaticFuncArgs(0.5,RangeParams(700,1000,10000))
|----T~PeriodicSamplesMapper(start=700000, step=1000000, end=10000000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=8, chunkMethod=TimeRangeChunkScan(580000,10000000), filters=List(ColumnFilter(job,Equals(prometheus)), ColumnFilter(__name__,Equals(my_hist))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)
|--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job))
|---T~InstantVectorFunctionMapper(function=HistogramBucket)
|----FA1~StaticFuncArgs(0.5,RangeParams(700,1000,10000))
|----T~PeriodicSamplesMapper(start=700000, step=1000000, end=10000000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=24, chunkMethod=TimeRangeChunkScan(580000,10000000), filters=List(ColumnFilter(job,Equals(prometheus)), ColumnFilter(__name__,Equals(my_hist))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#433386961],raw)"""
.stripMargin

validatePlan(execPlan, expected)
}

it("should NOT convert to histogram bucket query when _bucket is not a suffix") {
val t = TimeStepParams(700, 1000, 10000)
val lp = Parser.queryRangeToLogicalPlan("""my_bucket_counter{job="prometheus"}""", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import filodb.core.store.TimeRangeChunkScan
import filodb.prometheus.ast.{TimeStepParams, WindowConstants}
import filodb.prometheus.parse.Parser
import filodb.query._
import filodb.query.exec._
import filodb.query.exec.{MultiSchemaPartitionsExec, _}

class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaFutures {

Expand Down Expand Up @@ -139,9 +139,8 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}
Expand Down Expand Up @@ -169,9 +168,8 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l2.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l2.children.foreach { l3 =>
l3.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l3.rangeVectorTransformers.size shouldEqual 2
l3.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l3.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l3Exec = l3.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l3Exec)
}
}
}
Expand Down Expand Up @@ -372,13 +370,11 @@ class SingleClusterPlannerSplitSpec extends AnyFunSpec with Matchers with ScalaF
l1.isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true
l1.children.foreach { l2 =>
l2.isInstanceOf[MultiSchemaPartitionsExec] shouldEqual true
l2.rangeVectorTransformers.size shouldEqual 2
l2.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true
l2.rangeVectorTransformers(1).isInstanceOf[AggregateMapReduce] shouldEqual true
val l2Exec = l2.asInstanceOf[MultiSchemaPartitionsExec]
SingleClusterPlannerSpec.validateRangeVectorTransformersForPeriodicSeriesWithWindowingLogicalPlan(l2Exec)
}
}
}

}

it("should generate SplitExec wrapper with appropriate splits " +
Expand Down
Loading