From f68b19309d577188ac69532c2f6fa1923ae39d9a Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Fri, 8 Sep 2023 23:13:26 -0700 Subject: [PATCH] filodb(core) include the start time for range functions such as rate. --- .../filodb.core/store/ChunkSetInfo.scala | 2 +- .../query/exec/PeriodicSamplesMapper.scala | 19 ++++---- .../query/exec/rangefn/RateFunctions.scala | 10 ++-- .../exec/MultiSchemaPartitionsExecSpec.scala | 4 +- .../exec/PeriodicSamplesMapperSpec.scala | 6 +-- .../query/exec/WindowIteratorSpec.scala | 12 ++--- .../rangefn/AggrOverTimeFunctionsSpec.scala | 48 +++++++++---------- .../rangefn/PeriodicRateFunctionsSpec.scala | 18 +++---- 8 files changed, 60 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala index caa4a4264e..f54d1ef0e1 100644 --- a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala +++ b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala @@ -449,7 +449,7 @@ extends Iterator[ChunkSetInfoReader] { // advance window pointers and reset read index if (curWindowEnd == -1L) { curWindowEnd = start - curWindowStart = start - Math.max(window - 1, 0) // window cannot be below 0, ie start should never be > end + curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end } else { curWindowEnd += step curWindowStart += step diff --git a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala index fb600d436b..9fcd50d05d 100644 --- a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala +++ b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala @@ -155,16 +155,17 @@ final case class PeriodicSamplesMapper(startMs: Long, * pronounced if [1i] notation was used and step == lookback. */ private def extendLookback(rv: RangeVector, window: Long): Long = { + window // TODO There is a code path is used by Histogram bucket extraction path where // underlying vector need not be a RawDataRangeVector. For those cases, we may // not able to reliably extend lookback. // Much more thought and work needed - so punting the bug - val pubInt = rv match { - case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed) - => rvrd.publishInterval.getOrElse(0L) - case _ => 0L - } - window + pubInt +// val pubInt = rv match { +// case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed) +// => rvrd.publishInterval.getOrElse(0L) +// case _ => 0L +// } +// window + pubInt } // Transform source double or long to double schema @@ -343,7 +344,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, override def hasNext: Boolean = curWindowEnd <= end override def next(): TransientRow = { val curWindowStart = curWindowEnd - window - // current window is: (curWindowStart, curWindowEnd]. Excludes start, includes end. + // current window is: [curWindowStart, curWindowEnd]. Includes start, end. // Add elements to window until end of current window has reached while (rows.hasNext && rows.head.timestamp <= curWindowEnd) { @@ -377,7 +378,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, */ private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = { // cur is inside current window - cur.timestamp > curWindowStart + cur.timestamp >= curWindowStart } /** @@ -389,7 +390,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, * @param curWindowStart start time of the current window */ private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = { - (!windowQueue.isEmpty) && windowQueue.head.timestamp <= curWindowStart + (!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart } } diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 57cc2023f9..ed72c17f29 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -189,7 +189,7 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans if (highestTime > lowestTime) { // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" val result = RateFunctions.extrapolatedRate( - windowStart - 1, windowEnd, numSamples, + windowStart, windowEnd, numSamples, lowestTime, lowestValue, highestTime, highestValue, isCounter, isRate) @@ -286,7 +286,7 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra val rateArray = new Array[Double](lowestValue.numBuckets) cforRange { 0 until rateArray.size } { b => rateArray(b) = RateFunctions.extrapolatedRate( - windowStart - 1, windowEnd, numSamples, + windowStart, windowEnd, numSamples, lowestTime, lowestValue.bucketValue(b), highestTime, highestValue.bucketValue(b), isCounter, isRate) @@ -330,7 +330,7 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction { sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum) override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000) + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -347,7 +347,7 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction { sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum) override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000) + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -363,7 +363,7 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em cforRange { 0 until rateArray.size } { b => - rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - (windowStart - 1)) * 1000 + rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - windowStart) * 1000 } sampleToEmit.setValues(windowEnd, bv.MutableHistogram(hFunc.h.buckets, rateArray)) } diff --git a/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala b/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala index 092c309583..0be205167e 100644 --- a/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala @@ -239,9 +239,9 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF ColumnFilter("job", Filter.Equals("myCoolService".utf8))) val execPlan = MultiSchemaPartitionsExec(QueryContext(), dummyDispatcher, dsRef, 0, filters, AllChunkScan, "_metric_") - val start = now - (numRawSamples-100) * reportingInterval + val start = now - (numRawSamples-100) * reportingInterval + 1 val step = 0 - val end = now - (numRawSamples-100) * reportingInterval + val end = now - (numRawSamples-100) * reportingInterval + 1 execPlan.addRangeVectorTransformer(new PeriodicSamplesMapper(start, step, end, Some(reportingInterval * 3), Some(InternalRangeFunction.SumOverTime), QueryContext())) diff --git a/query/src/test/scala/filodb/query/exec/PeriodicSamplesMapperSpec.scala b/query/src/test/scala/filodb/query/exec/PeriodicSamplesMapperSpec.scala index f831391670..10057565db 100644 --- a/query/src/test/scala/filodb/query/exec/PeriodicSamplesMapperSpec.scala +++ b/query/src/test/scala/filodb/query/exec/PeriodicSamplesMapperSpec.scala @@ -98,9 +98,9 @@ class PeriodicSamplesMapperSpec extends AnyFunSpec with Matchers with ScalaFutur val rv = timeValueRVPk(samples, partKey) val expectedResults = List( - 500000L -> 125d, - 900000L ->100d, - 1300000 -> 400d + 500000L -> 100d, + 900000L -> 80d, + 1300000 -> 320d ) // step == lookback here diff --git a/query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala b/query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala index c1f815cbe8..691ec097af 100644 --- a/query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala +++ b/query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala @@ -197,7 +197,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val windowResults = Seq( 150000->1.0, 250000->5.0, - 350000->9.0, + 350000->12.0, 450000->13.0, 750000->17.0 ) @@ -482,7 +482,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val windowResults = Seq( 150000 -> 1.0, 250000 -> 2.5, - 350000 -> 4.5, + 350000 -> 4.0, 450000 -> 6.5 ) @@ -518,7 +518,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val windowResults = Seq( 150000 -> 1.0, 250000 -> 2.0, - 350000 -> 2.0, + 350000 -> 3.0, 450000 -> 2.0 ) @@ -563,7 +563,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val avgWindowResults = Seq( 150000 -> 4.0, 250000 -> 4.875, - 350000 -> 3.2, + 350000 -> 3.533333333333333, 450000 -> 6.916666666666667, 750000 -> 4.2592592592592595 ) @@ -579,7 +579,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val countWindowResults = Seq( 150000 -> 5.0, 250000 -> 8.0, - 350000 -> 10.0, + 350000 -> 15.0, 450000 -> 12.0, 750000 -> 27.0 ) @@ -608,7 +608,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec { val windowResults = Seq( 150000 -> 1.0, 250000 -> 2.0, - 350000 -> 4.0, + 350000 -> 3.0, 450000 -> 6.0 ) diff --git a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala index 4a8fea1d7a..4bae38c27f 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala @@ -265,12 +265,12 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val slidingIt = slidingWindowIt(data, rv, new SumOverTimeFunction(), windowSize, step) val aggregated = slidingIt.map(_.getDouble(1)).toBuffer slidingIt.close() - // drop first sample because of exclusive start - aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer + // do not drop first sample because of inclusive start + aggregated shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer val chunkedIt = chunkedWindowIt(data, rv, new SumOverTimeChunkedFunctionD(), windowSize, step) val aggregated2 = chunkedIt.map(_.getDouble(1)).toBuffer - aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer + aggregated2 shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer } } @@ -282,7 +282,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { info(s"iteration $x windowSize=$windowSize step=$step") val chunkedIt = chunkedWindowItHist(data, rv, new SumOverTimeChunkedFunctionH(), windowSize, step) - chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) => + chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) => val aggHist = aggRow.getHistogram(1) val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram]) .foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg } @@ -302,7 +302,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val row = new TransientHistMaxRow() val chunkedIt = chunkedWindowItHist(data, rv, new SumAndMaxOverTimeFuncHD(3), windowSize, step, row) - chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) => + chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) => val aggHist = aggRow.getHistogram(1) val sumRawHist = rawDataWindow.map(_(4).asInstanceOf[bv.LongHistogram]) .foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg } @@ -327,22 +327,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val minSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double].reverse), windowSize, step) val aggregated = minSlidingIt.map(_.getDouble(1)).toBuffer minSlidingIt.close() - // drop first sample because of exclusive start - aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer + // do not drop first sample because of inclusive start + aggregated shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer val minChunkedIt = chunkedWindowIt(data, rv, new MinOverTimeChunkedFunctionD(), windowSize, step) val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer - aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer + aggregated2 shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer val maxSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double]), windowSize, step) val aggregated3 = maxSlidingIt.map(_.getDouble(1)).toBuffer maxSlidingIt.close() - // drop first sample because of exclusive start - aggregated3 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer + // do not drop first sample because of inclusive start + aggregated3 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer val maxChunkedIt = chunkedWindowIt(data, rv, new MaxOverTimeChunkedFunctionD(), windowSize, step) val aggregated4 = maxChunkedIt.map(_.getDouble(1)).toBuffer - aggregated4 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer + aggregated4 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer } } @@ -359,25 +359,25 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val countSliding = slidingWindowIt(data, rv, new CountOverTimeFunction(), windowSize, step) val aggregated1 = countSliding.map(_.getDouble(1)).toBuffer countSliding.close() - aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer + aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer val countChunked = chunkedWindowIt(data, rv, new CountOverTimeChunkedFunction(), windowSize, step) val aggregated2 = countChunked.map(_.getDouble(1)).toBuffer - aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer + aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer val avgSliding = slidingWindowIt(data, rv, new AvgOverTimeFunction(), windowSize, step) val aggregated3 = avgSliding.map(_.getDouble(1)).toBuffer avgSliding.close() - aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer + aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer // In sample_data2, there are no NaN's, that's why using avg function is fine val avgChunked = chunkedWindowIt(data, rv, new AvgOverTimeChunkedFunctionD(), windowSize, step) val aggregated4 = avgChunked.map(_.getDouble(1)).toBuffer - aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer + aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step) val aggregated5 = changesChunked.map(_.getDouble(1)).toBuffer - aggregated5.drop(0) shouldEqual data.sliding(windowSize, step).map(_.length - 2).drop(0).toBuffer + aggregated5 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer } } @@ -394,22 +394,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val varSlidingIt = slidingWindowIt(data, rv, new StdVarOverTimeFunction(), windowSize, step) val aggregated2 = varSlidingIt.map(_.getDouble(1)).toBuffer varSlidingIt.close() - aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer + aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer val stdDevSlidingIt = slidingWindowIt(data, rv, new StdDevOverTimeFunction(), windowSize, step) val aggregated3 = stdDevSlidingIt.map(_.getDouble(1)).toBuffer stdDevSlidingIt.close() - aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer + aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer val varFunc = new StdVarOverTimeChunkedFunctionD() val windowIt4 = chunkedWindowIt(data, rv, varFunc, windowSize, step) val aggregated4 = windowIt4.map(_.getDouble(1)).toBuffer - aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer + aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer val devFunc = new StdDevOverTimeChunkedFunctionD() val windowIt5 = chunkedWindowIt(data, rv, devFunc, windowSize, step) val aggregated5 = windowIt5.map(_.getDouble(1)).toBuffer - aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer + aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer } } @@ -424,7 +424,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val chunkedIt = new ChunkedWindowIteratorD(rv, 100000, 20000, 150000, 30000, new ChangesChunkedFunctionD(), querySession) val aggregated = chunkedIt.map(x => (x.getLong(0), x.getDouble(1))).toList - aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 2.0)) + aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 3.0)) } it("should correctly calculate quantileovertime") { @@ -481,7 +481,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { (Seq(StaticFuncArgs(0.5, rangeParams))), windowSize, step) val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer - aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1)).map(median).toBuffer + aggregated2 shouldEqual data.sliding(windowSize, step).map(median).toBuffer } } @@ -504,7 +504,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step) val aggregated2 = changesChunked.map(_.getDouble(1)).toBuffer - data.sliding(windowSize, step).map(_.length - 2).toBuffer.zip(aggregated2).foreach { + data.sliding(windowSize, step).map(_.length - 1).toBuffer.zip(aggregated2).foreach { case (val1, val2) => if (val1 == -1) { val2.isNaN shouldEqual (true) // window does not have any element so changes will be NaN } else { @@ -649,7 +649,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { val chunkedIt = chunkedWindowIt(data, rv, new ZScoreChunkedFunctionD(), windowSize, step) val aggregated = chunkedIt.map(_.getDouble(1)).toBuffer - aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d drop 1)).toBuffer + aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d)).toBuffer } } diff --git a/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala index 0d3bdf4d7a..35fb168270 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala @@ -186,9 +186,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures { val rv = timeValueRVPk(samples, partKey) val expectedResults = List( - 500000L -> 125d, - 900000L -> 100d, - 1300000 -> 400d + 500000L -> 100d, + 900000L -> 80d, + 1300000 -> 320d, ) // step == lookback here @@ -233,9 +233,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures { val rv = timeValueRVPk(samples, partKey) val expectedResults = List( - 500000L -> 125d, - 900000L -> 100d, - 1300000 -> 400d + 500000L -> 100d, + 900000L -> 80d, + 1300000 -> 320d, ) // step == lookback here @@ -280,9 +280,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures { val rv = timeValueRVPk(samples, partKey) val expectedResults = List( - 500000L -> 125d, - 900000L -> 100d, - 1300000 -> 400d + 500000L -> 100d, + 900000L -> 80d, + 1300000 -> 320d, ) // step == lookback here