From 78c18a9ecbea12b5a9e9fd71b2ea01bbbf6ed1fa Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Tue, 10 Oct 2023 15:55:05 -0700 Subject: [PATCH] rename the confusing variables. --- .../queryplanner/DefaultPlanner.scala | 10 +++---- .../queryplanner/SingleClusterPlanner.scala | 26 +++++++++---------- .../scala/filodb.core/query/RangeVector.scala | 4 +++ .../filodb/query/ProtoConvertersSpec.scala | 2 +- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 932d23a961..38041c994e 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -342,13 +342,13 @@ trait DefaultPlanner { sqww: SubqueryWithWindowing ) : PlanResult = { // absent over time is essentially sum(last(series)) sent through AbsentFunctionMapper - val realStartMs = sqww.atMs.getOrElse(sqww.startMs) - val realEndMs = sqww.atMs.getOrElse(sqww.endMs) - val realStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs) + val realScanStartMs = sqww.atMs.getOrElse(sqww.startMs) + val realScanEndMs = sqww.atMs.getOrElse(sqww.endMs) + val realScanStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs) innerExecPlan.plans.foreach(plan => { plan.addRangeVectorTransformer(PeriodicSamplesMapper( - realStartMs, realStep, realEndMs, + realScanStartMs, realScanStep, realScanEndMs, window, Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)), qContext, @@ -374,7 +374,7 @@ trait DefaultPlanner { val plans = addAbsentFunctionMapper( aggregatePlanResult, Seq(), - RangeParams(realStartMs / 1000, realStep / 1000, realEndMs / 1000), + RangeParams(realScanStartMs / 1000, realScanStep / 1000, realScanEndMs / 1000), qContext ).plans diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index bf2d392b30..37cc2ba69e 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -584,14 +584,14 @@ class SingleClusterPlanner(val dataset: Dataset, val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function) - val realStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs) - val realEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs) - val realStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) + val realScanStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs) + val realScanEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs) + val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext) val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window) - series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realStartMs, - realStepMs, realEndMs, window, Some(execRangeFn), qContext, + series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, + realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext, logicalPlanWithoutBucket.stepMultipleNotationUsed, paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource))) val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) { @@ -602,7 +602,7 @@ class SingleClusterPlanner(val dataset: Dataset, val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams = qContext.plannerParams.copy(skipAggregatePresent = true)), series))) addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters, - RangeParams(realStartMs / 1000, realStepMs / 1000, realEndMs / 1000), qContext) + RangeParams(realScanStartMs / 1000, realScanStepMs / 1000, realScanEndMs / 1000), qContext) } else series @@ -657,17 +657,17 @@ class SingleClusterPlanner(val dataset: Dataset, } else (None, None, lp) - val realStartMs = lp.atMs.getOrElse(lp.startMs) - val realEndMs = lp.atMs.getOrElse(lp.endMs) - val realStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) + val realScanStartMs = lp.atMs.getOrElse(lp.startMs) + val realScanEndMs = lp.atMs.getOrElse(lp.endMs) + val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess) - rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realStartMs, realStepMs, realEndMs, - None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs))) + rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs, + realScanEndMs, None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs))) if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) { - val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realStartMs/1000, realStepMs/1000, - realEndMs/1000)) + val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs/1000, realScanStepMs/1000, + realScanEndMs/1000)) rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket, Seq(paramsExec)))) } diff --git a/core/src/main/scala/filodb.core/query/RangeVector.scala b/core/src/main/scala/filodb.core/query/RangeVector.scala index c6c9db9847..d9c4d18ea1 100644 --- a/core/src/main/scala/filodb.core/query/RangeVector.scala +++ b/core/src/main/scala/filodb.core/query/RangeVector.scala @@ -220,6 +220,10 @@ final class RepeatValueVector(rangeVectorKey: RangeVectorKey, } val recordSchema: RecordSchema = schema + + // There is potential for optimization. + // The parent transformer does not need to iterate all rows. + // It can transform one data because data at all steps are identical. It just need to return a RepeatValueVector. override def rows(): RangeVectorCursor = { import NoCloseCursor._ // If rowReader is empty, iterate nothing. diff --git a/query/src/test/scala/filodb/query/ProtoConvertersSpec.scala b/query/src/test/scala/filodb/query/ProtoConvertersSpec.scala index 30f0efb384..a84c43ed90 100644 --- a/query/src/test/scala/filodb/query/ProtoConvertersSpec.scala +++ b/query/src/test/scala/filodb/query/ProtoConvertersSpec.scala @@ -735,7 +735,7 @@ class ProtoConvertersSpec extends AnyFunSpec with Matchers { } - it("should convert RepeatValueVector double to proto and back") {2 + it("should convert RepeatValueVector double to proto and back") { val recSchema = new RecordSchema(Seq(ColumnInfo("time", ColumnType.TimestampColumn), ColumnInfo("value", ColumnType.DoubleColumn))) val keysMap = Map("key1".utf8 -> "val1".utf8,