From 29eb63b6e06213b0b487a15d5c316ec1320d2d26 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Fri, 8 Sep 2023 23:13:26 -0700 Subject: [PATCH] filodb(core) include the start time for range functions such as rate. --- .../main/scala/filodb.core/store/ChunkSetInfo.scala | 2 +- .../filodb/query/exec/PeriodicSamplesMapper.scala | 6 +++--- .../filodb/query/exec/rangefn/RateFunctions.scala | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala index caa4a4264e..f54d1ef0e1 100644 --- a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala +++ b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala @@ -449,7 +449,7 @@ extends Iterator[ChunkSetInfoReader] { // advance window pointers and reset read index if (curWindowEnd == -1L) { curWindowEnd = start - curWindowStart = start - Math.max(window - 1, 0) // window cannot be below 0, ie start should never be > end + curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end } else { curWindowEnd += step curWindowStart += step diff --git a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala index fb600d436b..7d90101bbb 100644 --- a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala +++ b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala @@ -343,7 +343,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, override def hasNext: Boolean = curWindowEnd <= end override def next(): TransientRow = { val curWindowStart = curWindowEnd - window - // current window is: (curWindowStart, curWindowEnd]. Excludes start, includes end. + // current window is: [curWindowStart, curWindowEnd]. includes start, end. // Add elements to window until end of current window has reached while (rows.hasNext && rows.head.timestamp <= curWindowEnd) { @@ -377,7 +377,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, */ private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = { // cur is inside current window - cur.timestamp > curWindowStart + cur.timestamp >= curWindowStart } /** @@ -389,7 +389,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor, * @param curWindowStart start time of the current window */ private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = { - (!windowQueue.isEmpty) && windowQueue.head.timestamp <= curWindowStart + (!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart } } diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 57cc2023f9..ed72c17f29 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -189,7 +189,7 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans if (highestTime > lowestTime) { // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" val result = RateFunctions.extrapolatedRate( - windowStart - 1, windowEnd, numSamples, + windowStart, windowEnd, numSamples, lowestTime, lowestValue, highestTime, highestValue, isCounter, isRate) @@ -286,7 +286,7 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra val rateArray = new Array[Double](lowestValue.numBuckets) cforRange { 0 until rateArray.size } { b => rateArray(b) = RateFunctions.extrapolatedRate( - windowStart - 1, windowEnd, numSamples, + windowStart, windowEnd, numSamples, lowestTime, lowestValue.bucketValue(b), highestTime, highestValue.bucketValue(b), isCounter, isRate) @@ -330,7 +330,7 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction { sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum) override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000) + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -347,7 +347,7 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction { sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum) override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000) + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -363,7 +363,7 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em cforRange { 0 until rateArray.size } { b => - rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - (windowStart - 1)) * 1000 + rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - windowStart) * 1000 } sampleToEmit.setValues(windowEnd, bv.MutableHistogram(hFunc.h.buckets, rateArray)) }