Skip to content

Commit

Permalink
rename the confusing variables.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Oct 13, 2023
1 parent 69af9d3 commit 1dfc95a
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,13 @@ trait DefaultPlanner {
sqww: SubqueryWithWindowing
) : PlanResult = {
// absent over time is essentially sum(last(series)) sent through AbsentFunctionMapper
val realStartMs = sqww.atMs.getOrElse(sqww.startMs)
val realEndMs = sqww.atMs.getOrElse(sqww.endMs)
val realStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs)
val realScanStartMs = sqww.atMs.getOrElse(sqww.startMs)
val realScanEndMs = sqww.atMs.getOrElse(sqww.endMs)
val realScanStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs)

innerExecPlan.plans.foreach(plan => {
plan.addRangeVectorTransformer(PeriodicSamplesMapper(
realStartMs, realStep, realEndMs,
realScanStartMs, realScanStep, realScanEndMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
Expand All @@ -374,7 +374,7 @@ trait DefaultPlanner {
val plans = addAbsentFunctionMapper(
aggregatePlanResult,
Seq(),
RangeParams(realStartMs / 1000, realStep / 1000, realEndMs / 1000),
RangeParams(realScanStartMs / 1000, realScanStep / 1000, realScanEndMs / 1000),
qContext
).plans

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,14 +584,14 @@ class SingleClusterPlanner(val dataset: Dataset,
val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last
else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function)

val realStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs)
val realEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs)
val realStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)
val realScanStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs)
val realScanEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realStartMs,
realStepMs, realEndMs, window, Some(execRangeFn), qContext,
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))
val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
Expand All @@ -602,7 +602,7 @@ class SingleClusterPlanner(val dataset: Dataset,
val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams =
qContext.plannerParams.copy(skipAggregatePresent = true)), series)))
addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters,
RangeParams(realStartMs / 1000, realStepMs / 1000, realEndMs / 1000), qContext)
RangeParams(realScanStartMs / 1000, realScanStepMs / 1000, realScanEndMs / 1000), qContext)

} else series

Expand Down Expand Up @@ -657,17 +657,17 @@ class SingleClusterPlanner(val dataset: Dataset,

} else (None, None, lp)

val realStartMs = lp.atMs.getOrElse(lp.startMs)
val realEndMs = lp.atMs.getOrElse(lp.endMs)
val realStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)
val realScanStartMs = lp.atMs.getOrElse(lp.startMs)
val realScanEndMs = lp.atMs.getOrElse(lp.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realStartMs, realStepMs, realEndMs,
None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs,
realScanEndMs, None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realStartMs/1000, realStepMs/1000,
realEndMs/1000))
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs/1000, realScanStepMs/1000,
realScanEndMs/1000))
rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ final class RepeatValueVector(rangeVectorKey: RangeVectorKey,
}

val recordSchema: RecordSchema = schema

// There is potential for optimization.
// The parent transformer does not need to iterate all rows.
// It can transform one data because data at all steps are identical. It just need to return a RepeatValueVector.
override def rows(): RangeVectorCursor = {
import NoCloseCursor._
// If rowReader is empty, iterate nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ class ProtoConvertersSpec extends AnyFunSpec with Matchers {
}


it("should convert RepeatValueVector double to proto and back") {2
it("should convert RepeatValueVector double to proto and back") {
val recSchema = new RecordSchema(Seq(ColumnInfo("time", ColumnType.TimestampColumn),
ColumnInfo("value", ColumnType.DoubleColumn)))
val keysMap = Map("key1".utf8 -> "val1".utf8,
Expand Down

0 comments on commit 1dfc95a

Please sign in to comment.