Skip to content

Commit

Permalink
filodb(core) include the start time for range functions such as rate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Sep 9, 2023
1 parent 7adc382 commit f68b193
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 59 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/store/ChunkSetInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ extends Iterator[ChunkSetInfoReader] {
// advance window pointers and reset read index
if (curWindowEnd == -1L) {
curWindowEnd = start
curWindowStart = start - Math.max(window - 1, 0) // window cannot be below 0, ie start should never be > end
curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end
} else {
curWindowEnd += step
curWindowStart += step
Expand Down
19 changes: 10 additions & 9 deletions query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,17 @@ final case class PeriodicSamplesMapper(startMs: Long,
* pronounced if [1i] notation was used and step == lookback.
*/
private def extendLookback(rv: RangeVector, window: Long): Long = {
window
// TODO There is a code path is used by Histogram bucket extraction path where
// underlying vector need not be a RawDataRangeVector. For those cases, we may
// not able to reliably extend lookback.
// Much more thought and work needed - so punting the bug
val pubInt = rv match {
case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed)
=> rvrd.publishInterval.getOrElse(0L)
case _ => 0L
}
window + pubInt
// val pubInt = rv match {
// case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed)
// => rvrd.publishInterval.getOrElse(0L)
// case _ => 0L
// }
// window + pubInt
}

// Transform source double or long to double schema
Expand Down Expand Up @@ -343,7 +344,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
override def hasNext: Boolean = curWindowEnd <= end
override def next(): TransientRow = {
val curWindowStart = curWindowEnd - window
// current window is: (curWindowStart, curWindowEnd]. Excludes start, includes end.
// current window is: [curWindowStart, curWindowEnd]. Includes start, end.

// Add elements to window until end of current window has reached
while (rows.hasNext && rows.head.timestamp <= curWindowEnd) {
Expand Down Expand Up @@ -377,7 +378,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
*/
private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = {
// cur is inside current window
cur.timestamp > curWindowStart
cur.timestamp >= curWindowStart
}

/**
Expand All @@ -389,7 +390,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
* @param curWindowStart start time of the current window
*/
private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = {
(!windowQueue.isEmpty) && windowQueue.head.timestamp <= curWindowStart
(!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
if (highestTime > lowestTime) {
// NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive"
val result = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue,
highestTime, highestValue,
isCounter, isRate)
Expand Down Expand Up @@ -286,7 +286,7 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
val rateArray = new Array[Double](lowestValue.numBuckets)
cforRange { 0 until rateArray.size } { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue.bucketValue(b),
highestTime, highestValue.bucketValue(b),
isCounter, isRate)
Expand Down Expand Up @@ -330,7 +330,7 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction {
sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}

Expand All @@ -347,7 +347,7 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction {
sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}
Expand All @@ -363,7 +363,7 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
cforRange {
0 until rateArray.size
} { b =>
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - (windowStart - 1)) * 1000
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - windowStart) * 1000
}
sampleToEmit.setValues(windowEnd, bv.MutableHistogram(hFunc.h.buckets, rateArray))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))
val execPlan = MultiSchemaPartitionsExec(QueryContext(), dummyDispatcher,
dsRef, 0, filters, AllChunkScan, "_metric_")
val start = now - (numRawSamples-100) * reportingInterval
val start = now - (numRawSamples-100) * reportingInterval + 1
val step = 0
val end = now - (numRawSamples-100) * reportingInterval
val end = now - (numRawSamples-100) * reportingInterval + 1
execPlan.addRangeVectorTransformer(new PeriodicSamplesMapper(start, step, end, Some(reportingInterval * 3),
Some(InternalRangeFunction.SumOverTime), QueryContext()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ class PeriodicSamplesMapperSpec extends AnyFunSpec with Matchers with ScalaFutur
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L ->100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d
)

// step == lookback here
Expand Down
12 changes: 6 additions & 6 deletions query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000->1.0,
250000->5.0,
350000->9.0,
350000->12.0,
450000->13.0,
750000->17.0
)
Expand Down Expand Up @@ -482,7 +482,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.5,
350000 -> 4.5,
350000 -> 4.0,
450000 -> 6.5
)

Expand Down Expand Up @@ -518,7 +518,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.0,
350000 -> 2.0,
350000 -> 3.0,
450000 -> 2.0
)

Expand Down Expand Up @@ -563,7 +563,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val avgWindowResults = Seq(
150000 -> 4.0,
250000 -> 4.875,
350000 -> 3.2,
350000 -> 3.533333333333333,
450000 -> 6.916666666666667,
750000 -> 4.2592592592592595
)
Expand All @@ -579,7 +579,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val countWindowResults = Seq(
150000 -> 5.0,
250000 -> 8.0,
350000 -> 10.0,
350000 -> 15.0,
450000 -> 12.0,
750000 -> 27.0
)
Expand Down Expand Up @@ -608,7 +608,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.0,
350000 -> 4.0,
350000 -> 3.0,
450000 -> 6.0
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val slidingIt = slidingWindowIt(data, rv, new SumOverTimeFunction(), windowSize, step)
val aggregated = slidingIt.map(_.getDouble(1)).toBuffer
slidingIt.close()
// drop first sample because of exclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer
// do not drop first sample because of inclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer

val chunkedIt = chunkedWindowIt(data, rv, new SumOverTimeChunkedFunctionD(), windowSize, step)
val aggregated2 = chunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer
}
}

Expand All @@ -282,7 +282,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
info(s"iteration $x windowSize=$windowSize step=$step")

val chunkedIt = chunkedWindowItHist(data, rv, new SumOverTimeChunkedFunctionH(), windowSize, step)
chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) =>
chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) =>
val aggHist = aggRow.getHistogram(1)
val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram])
.foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg }
Expand All @@ -302,7 +302,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {

val row = new TransientHistMaxRow()
val chunkedIt = chunkedWindowItHist(data, rv, new SumAndMaxOverTimeFuncHD(3), windowSize, step, row)
chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) =>
chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) =>
val aggHist = aggRow.getHistogram(1)
val sumRawHist = rawDataWindow.map(_(4).asInstanceOf[bv.LongHistogram])
.foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg }
Expand All @@ -327,22 +327,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val minSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double].reverse), windowSize, step)
val aggregated = minSlidingIt.map(_.getDouble(1)).toBuffer
minSlidingIt.close()
// drop first sample because of exclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer
// do not drop first sample because of inclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer

val minChunkedIt = chunkedWindowIt(data, rv, new MinOverTimeChunkedFunctionD(), windowSize, step)
val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer

val maxSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double]), windowSize, step)
val aggregated3 = maxSlidingIt.map(_.getDouble(1)).toBuffer
maxSlidingIt.close()
// drop first sample because of exclusive start
aggregated3 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer
// do not drop first sample because of inclusive start
aggregated3 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer

val maxChunkedIt = chunkedWindowIt(data, rv, new MaxOverTimeChunkedFunctionD(), windowSize, step)
val aggregated4 = maxChunkedIt.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer
}
}

Expand All @@ -359,25 +359,25 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val countSliding = slidingWindowIt(data, rv, new CountOverTimeFunction(), windowSize, step)
val aggregated1 = countSliding.map(_.getDouble(1)).toBuffer
countSliding.close()
aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer

val countChunked = chunkedWindowIt(data, rv, new CountOverTimeChunkedFunction(), windowSize, step)
val aggregated2 = countChunked.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer

val avgSliding = slidingWindowIt(data, rv, new AvgOverTimeFunction(), windowSize, step)
val aggregated3 = avgSliding.map(_.getDouble(1)).toBuffer
avgSliding.close()
aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer
aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer

// In sample_data2, there are no NaN's, that's why using avg function is fine
val avgChunked = chunkedWindowIt(data, rv, new AvgOverTimeChunkedFunctionD(), windowSize, step)
val aggregated4 = avgChunked.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer

val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step)
val aggregated5 = changesChunked.map(_.getDouble(1)).toBuffer
aggregated5.drop(0) shouldEqual data.sliding(windowSize, step).map(_.length - 2).drop(0).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
}
}

Expand All @@ -394,22 +394,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val varSlidingIt = slidingWindowIt(data, rv, new StdVarOverTimeFunction(), windowSize, step)
val aggregated2 = varSlidingIt.map(_.getDouble(1)).toBuffer
varSlidingIt.close()
aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer

val stdDevSlidingIt = slidingWindowIt(data, rv, new StdDevOverTimeFunction(), windowSize, step)
val aggregated3 = stdDevSlidingIt.map(_.getDouble(1)).toBuffer
stdDevSlidingIt.close()
aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer
aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer

val varFunc = new StdVarOverTimeChunkedFunctionD()
val windowIt4 = chunkedWindowIt(data, rv, varFunc, windowSize, step)
val aggregated4 = windowIt4.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer

val devFunc = new StdDevOverTimeChunkedFunctionD()
val windowIt5 = chunkedWindowIt(data, rv, devFunc, windowSize, step)
val aggregated5 = windowIt5.map(_.getDouble(1)).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer
}
}

Expand All @@ -424,7 +424,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val chunkedIt = new ChunkedWindowIteratorD(rv, 100000, 20000, 150000, 30000,
new ChangesChunkedFunctionD(), querySession)
val aggregated = chunkedIt.map(x => (x.getLong(0), x.getDouble(1))).toList
aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 2.0))
aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 3.0))
}

it("should correctly calculate quantileovertime") {
Expand Down Expand Up @@ -481,7 +481,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
(Seq(StaticFuncArgs(0.5, rangeParams))), windowSize, step)

val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1)).map(median).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(median).toBuffer
}
}

Expand All @@ -504,7 +504,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step)
val aggregated2 = changesChunked.map(_.getDouble(1)).toBuffer

data.sliding(windowSize, step).map(_.length - 2).toBuffer.zip(aggregated2).foreach {
data.sliding(windowSize, step).map(_.length - 1).toBuffer.zip(aggregated2).foreach {
case (val1, val2) => if (val1 == -1) {
val2.isNaN shouldEqual (true) // window does not have any element so changes will be NaN
} else {
Expand Down Expand Up @@ -649,7 +649,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {

val chunkedIt = chunkedWindowIt(data, rv, new ZScoreChunkedFunctionD(), windowSize, step)
val aggregated = chunkedIt.map(_.getDouble(1)).toBuffer
aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d drop 1)).toBuffer
aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d)).toBuffer
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down Expand Up @@ -233,9 +233,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down Expand Up @@ -280,9 +280,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down

0 comments on commit f68b193

Please sign in to comment.