Skip to content

Commit

Permalink
filodb(core) include the start time for range functions such as rate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Sep 9, 2023
1 parent 7adc382 commit 29eb63b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/store/ChunkSetInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ extends Iterator[ChunkSetInfoReader] {
// advance window pointers and reset read index
if (curWindowEnd == -1L) {
curWindowEnd = start
curWindowStart = start - Math.max(window - 1, 0) // window cannot be below 0, ie start should never be > end
curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end
} else {
curWindowEnd += step
curWindowStart += step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
override def hasNext: Boolean = curWindowEnd <= end
override def next(): TransientRow = {
val curWindowStart = curWindowEnd - window
// current window is: (curWindowStart, curWindowEnd]. Excludes start, includes end.
// current window is: [curWindowStart, curWindowEnd]. includes start, end.

// Add elements to window until end of current window has reached
while (rows.hasNext && rows.head.timestamp <= curWindowEnd) {
Expand Down Expand Up @@ -377,7 +377,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
*/
private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = {
// cur is inside current window
cur.timestamp > curWindowStart
cur.timestamp >= curWindowStart
}

/**
Expand All @@ -389,7 +389,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
* @param curWindowStart start time of the current window
*/
private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = {
(!windowQueue.isEmpty) && windowQueue.head.timestamp <= curWindowStart
(!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
if (highestTime > lowestTime) {
// NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive"
val result = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue,
highestTime, highestValue,
isCounter, isRate)
Expand Down Expand Up @@ -286,7 +286,7 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
val rateArray = new Array[Double](lowestValue.numBuckets)
cforRange { 0 until rateArray.size } { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue.bucketValue(b),
highestTime, highestValue.bucketValue(b),
isCounter, isRate)
Expand Down Expand Up @@ -330,7 +330,7 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction {
sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}

Expand All @@ -347,7 +347,7 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction {
sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}
Expand All @@ -363,7 +363,7 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
cforRange {
0 until rateArray.size
} { b =>
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - (windowStart - 1)) * 1000
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - windowStart) * 1000
}
sampleToEmit.setValues(windowEnd, bv.MutableHistogram(hFunc.h.buckets, rateArray))
}
Expand Down

0 comments on commit 29eb63b

Please sign in to comment.