Skip to content

Commit

Permalink
Adding RangeVectorTransformer for le filter for PeriodicSeriesWithWin…
Browse files Browse the repository at this point in the history
…dowing
  • Loading branch information
sandeep6189 committed Feb 14, 2024
1 parent 32e4eeb commit d4cd73b
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,13 +558,16 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}

// scalastyle:off method.length
override private[queryplanner] def materializePeriodicSeriesWithWindowing(qContext: QueryContext,
lp: PeriodicSeriesWithWindowing,
forceInProcess: Boolean): PlanResult = {

val logicalPlanWithoutBucket = if (queryConfig.translatePromToFilodbHistogram) {
removeBucket(Right(lp))._3.right.get
} else lp
val (nameFilter: Option[String], leFilter: Option[String], logicalPlanWithoutBucket: PeriodicSeriesWithWindowing) =
if (queryConfig.translatePromToFilodbHistogram) {
val result = removeBucket(Right(lp))
(result._1, result._2, result._3.right.get)
} else (None, None, lp)

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw
Expand All @@ -585,6 +588,15 @@ class SingleClusterPlanner(val dataset: Dataset,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))

// Add the le filter transformer to select the required bucket
if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs / 1000, realScanStepMs / 1000,
realScanEndMs / 1000))
series.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}

val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
Expand All @@ -602,9 +614,9 @@ class SingleClusterPlanner(val dataset: Dataset,
result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}

result
}
// scalastyle:on method.length

override private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]) = {
val rawSeries = lp match {
Expand Down

0 comments on commit d4cd73b

Please sign in to comment.