Skip to content

Commit

Permalink
feat(query) Implement @modifier. (#1678)
Browse files Browse the repository at this point in the history
* feat(query) Implement @modifier.

1. Override range param based on @modifier timestamp for PeriodicSamplesMapper.
2. Add RepeatTransformer and RepeatValueVector to populate data for all timestamps.
3. Add GRPC functions for RepeatValueVector.
4. Prevent @modifier query that spans both raw and downsampled data.

---------

Co-authored-by: Yu Zhang <[email protected]>
  • Loading branch information
yu-shipit and Yu Zhang authored Nov 17, 2023
1 parent fdb74d6 commit 0e49dc7
Show file tree
Hide file tree
Showing 16 changed files with 575 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ trait DefaultPlanner {
val paramsExec = materializeFunctionArgs(sqww.functionArgs, qContext)
val rangeVectorTransformer =
PeriodicSamplesMapper(
sqww.startMs, sqww.stepMs, sqww.endMs,
sqww.atMs.getOrElse(sqww.startMs), sqww.stepMs, sqww.atMs.getOrElse(sqww.endMs),
window,
Some(rangeFn),
qContext,
Expand All @@ -322,7 +322,11 @@ trait DefaultPlanner {
rawSource = false,
leftInclusiveWindow = true
)
innerExecPlan.plans.foreach { p => p.addRangeVectorTransformer(rangeVectorTransformer)}
innerExecPlan.plans.foreach { p => {
p.addRangeVectorTransformer(rangeVectorTransformer)
sqww.atMs.map(_ => p.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs
, p.queryWithPlanName(qContext))))
}}
innerExecPlan
} else {
val innerPlan = sqww.innerPeriodicSeries
Expand All @@ -338,17 +342,24 @@ trait DefaultPlanner {
sqww: SubqueryWithWindowing
) : PlanResult = {
// absent over time is essentially sum(last(series)) sent through AbsentFunctionMapper
innerExecPlan.plans.foreach(
_.addRangeVectorTransformer(PeriodicSamplesMapper(
sqww.startMs, sqww.stepMs, sqww.endMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
stepMultipleNotationUsed = false,
Seq(),
offsetMs,
rawSource = false
))
val realScanStartMs = sqww.atMs.getOrElse(sqww.startMs)
val realScanEndMs = sqww.atMs.getOrElse(sqww.endMs)
val realScanStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs)

innerExecPlan.plans.foreach(plan => {
plan.addRangeVectorTransformer(PeriodicSamplesMapper(
realScanStartMs, realScanStep, realScanEndMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
stepMultipleNotationUsed = false,
Seq(),
offsetMs,
rawSource = false
))
sqww.atMs.map(_ => plan.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs,
plan.queryWithPlanName(qContext))))
}
)
val aggregate = Aggregate(AggregationOperator.Sum, innerPlan, Nil,
AggregateClause.byOpt(Seq("job")))
Expand All @@ -360,14 +371,17 @@ trait DefaultPlanner {
innerExecPlan)
)
)
addAbsentFunctionMapper(
val plans = addAbsentFunctionMapper(
aggregatePlanResult,
Seq(),
RangeParams(
sqww.startMs / 1000, sqww.stepMs / 1000, sqww.endMs / 1000
),
RangeParams(realScanStartMs / 1000, realScanStep / 1000, realScanEndMs / 1000),
qContext
)
).plans

if (sqww.atMs.nonEmpty) {
plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs,
p.queryWithPlanName(qContext))))
}
aggregatePlanResult
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ import filodb.query.exec._
PlanResult(Seq(execPlan))
}

private def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = {
periodicSeriesPlan match {
case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq
case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq
case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq
case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq
case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs)
case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors)
case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors)
case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors)
case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors)
case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors)
case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors)
case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan|
_: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq()
}
}


// scalastyle:off method.length
private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = {
import LogicalPlan._
Expand All @@ -55,9 +74,24 @@ import filodb.query.exec._
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)

val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max

val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset
val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan)

val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
// should be in raw cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime)
} else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
// should be in down sample cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime)
} else {
atModifierTimestampsWithOffset.isEmpty
}
require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" +
s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.")

if (maxOffset != minOffset
&& startWithOffsetMs - lookbackMs < earliestRawTime
&& endWithOffsetMs >= earliestRawTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ class SingleClusterPlanner(val dataset: Dataset,
//For binary join LHS & RHS should have same times
val boundParams = periodicSeriesPlan.get.head match {
case p: PeriodicSeries => (p.startMs, p.stepMs, WindowConstants.staleDataLookbackMillis,
p.offsetMs.getOrElse(0L), p.endMs)
case w: PeriodicSeriesWithWindowing => (w.startMs, w.stepMs, w.window, w.offsetMs.getOrElse(0L), w.endMs)
p.offsetMs.getOrElse(0L), p.atMs.getOrElse(0), p.endMs)
case w: PeriodicSeriesWithWindowing => (w.startMs, w.stepMs, w.window, w.offsetMs.getOrElse(0L),
w.atMs.getOrElse(0L), w.endMs)
case _ => throw new UnsupportedOperationException(s"Invalid plan: ${periodicSeriesPlan.get.head}")
}

val newStartMs = boundToStartTimeToEarliestRetained(boundParams._1, boundParams._2, boundParams._3,
boundParams._4)
if (newStartMs <= boundParams._5) { // if there is an overlap between query and retention ranges
if (newStartMs <= boundParams._6) { // if there is an overlap between query and retention ranges
if (newStartMs != boundParams._1)
Some(LogicalPlanUtils.copyLogicalPlanWithUpdatedTimeRange(logicalPlan, TimeRange(newStartMs, boundParams._5)))
Some(LogicalPlanUtils.copyLogicalPlanWithUpdatedTimeRange(logicalPlan, TimeRange(newStartMs, boundParams._6)))
else Some(logicalPlan)
} else { // query is outside retention period, simply return empty result
None
Expand Down Expand Up @@ -583,23 +584,35 @@ class SingleClusterPlanner(val dataset: Dataset,
val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last
else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function)

val realScanStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs)
val realScanEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(logicalPlanWithoutBucket.startMs,
logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn), qContext,
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))
if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
// Add sum to aggregate all child responses
// If all children have NaN value, sum will yield NaN and AbsentFunctionMapper will yield 1
val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams =
qContext.plannerParams.copy(skipAggregatePresent = true)), series)))
addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters,
RangeParams(logicalPlanWithoutBucket.startMs / 1000, logicalPlanWithoutBucket.stepMs / 1000,
logicalPlanWithoutBucket.endMs / 1000), qContext)
RangeParams(realScanStartMs / 1000, realScanStepMs / 1000, realScanEndMs / 1000), qContext)

} else series

// repeat the same value for each step if '@' is specified
if (lp.atMs.nonEmpty) {
result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}

result
}

private def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]) = {
Expand Down Expand Up @@ -644,17 +657,26 @@ class SingleClusterPlanner(val dataset: Dataset,

} else (None, None, lp)

val realScanStartMs = lp.atMs.getOrElse(lp.startMs)
val realScanEndMs = lp.atMs.getOrElse(lp.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs,
realScanEndMs, None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(lp.startMs/1000, lp.stepMs/1000,
lp.endMs/1000))
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs/1000, realScanStepMs/1000,
realScanEndMs/1000))
rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
rawSeries
// repeat the same value for each step if '@' is specified
if (lp.atMs.nonEmpty) {
rawSeries.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}
rawSeries
}

/**
Expand Down
Loading

0 comments on commit 0e49dc7

Please sign in to comment.