Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filodb(core) include the start time for range functions such as rate. #1665

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/store/ChunkSetInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ extends Iterator[ChunkSetInfoReader] {
// advance window pointers and reset read index
if (curWindowEnd == -1L) {
curWindowEnd = start
curWindowStart = start - Math.max(window - 1, 0) // window cannot be below 0, ie start should never be > end
curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end
} else {
curWindowEnd += step
curWindowStart += step
Expand Down
19 changes: 10 additions & 9 deletions query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,17 @@ final case class PeriodicSamplesMapper(startMs: Long,
* pronounced if [1i] notation was used and step == lookback.
*/
private def extendLookback(rv: RangeVector, window: Long): Long = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we include the start time, we do not need to extend the lookback window anymore.
Here is an example, the window is 15s and no need to extend the window to 15001ms
[2023-09-12 15:47:51,816] INFO query-sched-1 default f.query.Query$ [] - functionId=Some(Rate) stepMultipleNotationUsed=false window=15000 rv=RawDataRangeVector(/shard:3/b2[schema=prom-counter _metric_=my_counter,tags={_ns_: Test-data-1694557937_p1, _ws_: aci-telemetry, code: 817, instance: inst-4, mode: nice, service: service-2, version: 3.3.13}] [grp16],TimeSeriesPartition(shard=3,partId=112){b2[schema=prom-counter _metric_=my_counter,tags={_ns_: Test-data-1694557937_p1, _ws_: aci-telemetry, code: 817, instance: inst-4, mode: nice, service: service-2, version: 3.3.13}]},TimeRangeChunkScan(1694557922000,1694557937000),[I@65c327b0,0,200000000,dcd2633e-6b01-4d89-8d73-124b18cacdfd)

window
// TODO There is a code path is used by Histogram bucket extraction path where
// underlying vector need not be a RawDataRangeVector. For those cases, we may
// not able to reliably extend lookback.
// Much more thought and work needed - so punting the bug
val pubInt = rv match {
case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed)
=> rvrd.publishInterval.getOrElse(0L)
case _ => 0L
}
window + pubInt
// val pubInt = rv match {
// case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed)
// => rvrd.publishInterval.getOrElse(0L)
// case _ => 0L
// }
// window + pubInt
}

// Transform source double or long to double schema
Expand Down Expand Up @@ -343,7 +344,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
override def hasNext: Boolean = curWindowEnd <= end
override def next(): TransientRow = {
val curWindowStart = curWindowEnd - window
// current window is: (curWindowStart, curWindowEnd]. Excludes start, includes end.
// current window is: [curWindowStart, curWindowEnd]. Includes start, end.

// Add elements to window until end of current window has reached
while (rows.hasNext && rows.head.timestamp <= curWindowEnd) {
Expand Down Expand Up @@ -377,7 +378,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
*/
private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = {
// cur is inside current window
cur.timestamp > curWindowStart
cur.timestamp >= curWindowStart
}

/**
Expand All @@ -389,7 +390,7 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
* @param curWindowStart start time of the current window
*/
private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = {
(!windowQueue.isEmpty) && windowQueue.head.timestamp <= curWindowStart
(!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
if (highestTime > lowestTime) {
// NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive"
val result = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue,
highestTime, highestValue,
isCounter, isRate)
Expand Down Expand Up @@ -286,7 +286,7 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
val rateArray = new Array[Double](lowestValue.numBuckets)
cforRange { 0 until rateArray.size } { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
windowStart - 1, windowEnd, numSamples,
windowStart, windowEnd, numSamples,
lowestTime, lowestValue.bucketValue(b),
highestTime, highestValue.bucketValue(b),
isCounter, isRate)
Expand Down Expand Up @@ -330,7 +330,7 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction {
sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}

Expand All @@ -347,7 +347,7 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction {
sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart - 1)) * 1000)
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}
Expand All @@ -363,7 +363,7 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
cforRange {
0 until rateArray.size
} { b =>
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - (windowStart - 1)) * 1000
rateArray(b) = hFunc.h.bucketValue(b) / (windowEnd - windowStart) * 1000
}
sampleToEmit.setValues(windowEnd, bv.MutableHistogram(hFunc.h.buckets, rateArray))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))
val execPlan = MultiSchemaPartitionsExec(QueryContext(), dummyDispatcher,
dsRef, 0, filters, AllChunkScan, "_metric_")
val start = now - (numRawSamples-100) * reportingInterval
val start = now - (numRawSamples-100) * reportingInterval + 1
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
val step = 0
val end = now - (numRawSamples-100) * reportingInterval
val end = now - (numRawSamples-100) * reportingInterval + 1
execPlan.addRangeVectorTransformer(new PeriodicSamplesMapper(start, step, end, Some(reportingInterval * 3),
Some(InternalRangeFunction.SumOverTime), QueryContext()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ class PeriodicSamplesMapperSpec extends AnyFunSpec with Matchers with ScalaFutur
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L ->100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
)

// step == lookback here
Expand Down
12 changes: 6 additions & 6 deletions query/src/test/scala/filodb/query/exec/WindowIteratorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000->1.0,
250000->5.0,
350000->9.0,
350000->12.0,
450000->13.0,
750000->17.0
)
Expand Down Expand Up @@ -482,7 +482,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.5,
350000 -> 4.5,
350000 -> 4.0,
450000 -> 6.5
)

Expand Down Expand Up @@ -518,7 +518,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.0,
350000 -> 2.0,
350000 -> 3.0,
450000 -> 2.0
)

Expand Down Expand Up @@ -563,7 +563,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val avgWindowResults = Seq(
150000 -> 4.0,
250000 -> 4.875,
350000 -> 3.2,
350000 -> 3.533333333333333,
450000 -> 6.916666666666667,
750000 -> 4.2592592592592595
)
Expand All @@ -579,7 +579,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val countWindowResults = Seq(
150000 -> 5.0,
250000 -> 8.0,
350000 -> 10.0,
350000 -> 15.0,
450000 -> 12.0,
750000 -> 27.0
)
Expand Down Expand Up @@ -608,7 +608,7 @@ class WindowIteratorSpec extends RawDataWindowingSpec {
val windowResults = Seq(
150000 -> 1.0,
250000 -> 2.0,
350000 -> 4.0,
350000 -> 3.0,
450000 -> 6.0
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val slidingIt = slidingWindowIt(data, rv, new SumOverTimeFunction(), windowSize, step)
val aggregated = slidingIt.map(_.getDouble(1)).toBuffer
slidingIt.close()
// drop first sample because of exclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer
// do not drop first sample because of inclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer

val chunkedIt = chunkedWindowIt(data, rv, new SumOverTimeChunkedFunctionD(), windowSize, step)
val aggregated2 = chunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).sum).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.sum).toBuffer
}
}

Expand All @@ -282,7 +282,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
info(s"iteration $x windowSize=$windowSize step=$step")

val chunkedIt = chunkedWindowItHist(data, rv, new SumOverTimeChunkedFunctionH(), windowSize, step)
chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) =>
chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) =>
val aggHist = aggRow.getHistogram(1)
val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram])
.foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg }
Expand All @@ -302,7 +302,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {

val row = new TransientHistMaxRow()
val chunkedIt = chunkedWindowItHist(data, rv, new SumAndMaxOverTimeFuncHD(3), windowSize, step, row)
chunkedIt.zip(data.sliding(windowSize, step).map(_.drop(1))).foreach { case (aggRow, rawDataWindow) =>
chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) =>
val aggHist = aggRow.getHistogram(1)
val sumRawHist = rawDataWindow.map(_(4).asInstanceOf[bv.LongHistogram])
.foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg }
Expand All @@ -327,22 +327,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val minSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double].reverse), windowSize, step)
val aggregated = minSlidingIt.map(_.getDouble(1)).toBuffer
minSlidingIt.close()
// drop first sample because of exclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer
// do not drop first sample because of inclusive start
aggregated shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer

val minChunkedIt = chunkedWindowIt(data, rv, new MinOverTimeChunkedFunctionD(), windowSize, step)
val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1).min).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.min).toBuffer

val maxSlidingIt = slidingWindowIt(data, rv, new MinMaxOverTimeFunction(Ordering[Double]), windowSize, step)
val aggregated3 = maxSlidingIt.map(_.getDouble(1)).toBuffer
maxSlidingIt.close()
// drop first sample because of exclusive start
aggregated3 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer
// do not drop first sample because of inclusive start
aggregated3 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer

val maxChunkedIt = chunkedWindowIt(data, rv, new MaxOverTimeChunkedFunctionD(), windowSize, step)
val aggregated4 = maxChunkedIt.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(_.drop(1).max).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(_.max).toBuffer
}
}

Expand All @@ -359,25 +359,25 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val countSliding = slidingWindowIt(data, rv, new CountOverTimeFunction(), windowSize, step)
val aggregated1 = countSliding.map(_.getDouble(1)).toBuffer
countSliding.close()
aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
aggregated1 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer

val countChunked = chunkedWindowIt(data, rv, new CountOverTimeChunkedFunction(), windowSize, step)
val aggregated2 = countChunked.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.length).toBuffer

val avgSliding = slidingWindowIt(data, rv, new AvgOverTimeFunction(), windowSize, step)
val aggregated3 = avgSliding.map(_.getDouble(1)).toBuffer
avgSliding.close()
aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer
aggregated3 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer

// In sample_data2, there are no NaN's, that's why using avg function is fine
val avgChunked = chunkedWindowIt(data, rv, new AvgOverTimeChunkedFunctionD(), windowSize, step)
val aggregated4 = avgChunked.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a drop 1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => avg(a)).toBuffer

val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step)
val aggregated5 = changesChunked.map(_.getDouble(1)).toBuffer
aggregated5.drop(0) shouldEqual data.sliding(windowSize, step).map(_.length - 2).drop(0).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(_.length - 1).toBuffer
}
}

Expand All @@ -394,22 +394,22 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val varSlidingIt = slidingWindowIt(data, rv, new StdVarOverTimeFunction(), windowSize, step)
val aggregated2 = varSlidingIt.map(_.getDouble(1)).toBuffer
varSlidingIt.close()
aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer

val stdDevSlidingIt = slidingWindowIt(data, rv, new StdDevOverTimeFunction(), windowSize, step)
val aggregated3 = stdDevSlidingIt.map(_.getDouble(1)).toBuffer
stdDevSlidingIt.close()
aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer
aggregated3 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer

val varFunc = new StdVarOverTimeChunkedFunctionD()
val windowIt4 = chunkedWindowIt(data, rv, varFunc, windowSize, step)
val aggregated4 = windowIt4.map(_.getDouble(1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a drop 1)).toBuffer
aggregated4 shouldEqual data.sliding(windowSize, step).map(a => stdVar(a)).toBuffer

val devFunc = new StdDevOverTimeChunkedFunctionD()
val windowIt5 = chunkedWindowIt(data, rv, devFunc, windowSize, step)
val aggregated5 = windowIt5.map(_.getDouble(1)).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d drop 1))).toBuffer
aggregated5 shouldEqual data.sliding(windowSize, step).map(d => Math.sqrt(stdVar(d))).toBuffer
}
}

Expand All @@ -424,7 +424,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val chunkedIt = new ChunkedWindowIteratorD(rv, 100000, 20000, 150000, 30000,
new ChangesChunkedFunctionD(), querySession)
val aggregated = chunkedIt.map(x => (x.getLong(0), x.getDouble(1))).toList
aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 2.0))
aggregated shouldEqual List((100000, 0.0), (120000, 2.0), (140000, 3.0))
}

it("should correctly calculate quantileovertime") {
Expand Down Expand Up @@ -481,7 +481,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
(Seq(StaticFuncArgs(0.5, rangeParams))), windowSize, step)

val aggregated2 = minChunkedIt.map(_.getDouble(1)).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(_.drop(1)).map(median).toBuffer
aggregated2 shouldEqual data.sliding(windowSize, step).map(median).toBuffer
}
}

Expand All @@ -504,7 +504,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
val changesChunked = chunkedWindowIt(data, rv, new ChangesChunkedFunctionD(), windowSize, step)
val aggregated2 = changesChunked.map(_.getDouble(1)).toBuffer

data.sliding(windowSize, step).map(_.length - 2).toBuffer.zip(aggregated2).foreach {
data.sliding(windowSize, step).map(_.length - 1).toBuffer.zip(aggregated2).foreach {
case (val1, val2) => if (val1 == -1) {
val2.isNaN shouldEqual (true) // window does not have any element so changes will be NaN
} else {
Expand Down Expand Up @@ -649,7 +649,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {

val chunkedIt = chunkedWindowIt(data, rv, new ZScoreChunkedFunctionD(), windowSize, step)
val aggregated = chunkedIt.map(_.getDouble(1)).toBuffer
aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d drop 1)).toBuffer
aggregated shouldEqual data.sliding(windowSize, step).map(d => z_score(d)).toBuffer
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down Expand Up @@ -233,9 +233,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down Expand Up @@ -280,9 +280,9 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures {
val rv = timeValueRVPk(samples, partKey)

val expectedResults = List(
500000L -> 125d,
900000L -> 100d,
1300000 -> 400d
500000L -> 100d,
900000L -> 80d,
1300000 -> 320d,
)

// step == lookback here
Expand Down
Loading