Skip to content

Commit

Permalink
cherry-pick: misc(query): feature flag to exclude start time for rang…
Browse files Browse the repository at this point in the history
…e functions (#1735)
  • Loading branch information
sherali42 authored Mar 7, 2024
2 parents d1b45eb + a565021 commit 31cc8f3
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
3 changes: 3 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ filodb {

query {

# feature flag to enable start time inclusive in window lookback
inclusive-range = true

# feature flag for query result streaming
streaming-query-results-enabled = false

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/filodb.core/store/ChunkSetInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ class WindowedChunkIterator(rv: RawDataRangeVector, start: Long, step: Long, end
var curWindowEnd: Long = -1L,
var curWindowStart: Long = -1L,
private var readIndex: Int = 0,
windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader])
windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader],
isInclusiveRange: Boolean = true)
extends Iterator[ChunkSetInfoReader] {
require(step > 0, s"Adjusted step $step not > 0")
private val infos = rv.chunkInfos(start - window, end)
Expand All @@ -455,7 +456,8 @@ extends Iterator[ChunkSetInfoReader] {
// advance window pointers and reset read index
if (curWindowEnd == -1L) {
curWindowEnd = start
curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end
val windowDuration = if (isInclusiveRange) window else window - 1
curWindowStart = start - Math.max(windowDuration, 0) // window cannot be below 0, ie start should never be > end
} else {
curWindowEnd += step
curWindowStart += step
Expand Down
14 changes: 11 additions & 3 deletions query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
import monix.reactive.Observable
import org.jctools.queues.SpscUnboundedArrayQueue

import filodb.core.GlobalConfig.systemConfig
import filodb.core.metadata.Column.ColumnType
import filodb.core.query._
import filodb.core.store.WindowedChunkIterator
Expand Down Expand Up @@ -196,6 +197,10 @@ final case class PeriodicSamplesMapper(startMs: Long,
}
}

object FiloQueryConfig {
val isInclusiveRange = systemConfig.getBoolean("filodb.query.inclusive-range")
}

/**
* A low-overhead iterator which works on one window at a time, optimally applying columnar techniques
* to compute each window as fast as possible on multiple rows at a time.
Expand All @@ -213,7 +218,8 @@ extends WrappedCursor(rv.rows()) with StrictLogging {
// Lazily open the iterator and obtain the lock. This allows one thread to create the
// iterator, but the lock is owned by the thread actually performing the iteration.
private lazy val windowIt = {
val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext)
val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext,
isInclusiveRange = FiloQueryConfig.isInclusiveRange)
// Need to hold the shared lock explicitly, because the window iterator needs to
// pre-fetch chunks to determine the window. This pre-fetching can force the internal
// iterator to close, which would release the lock too soon.
Expand Down Expand Up @@ -378,7 +384,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
*/
private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = {
// cur is inside current window
cur.timestamp >= curWindowStart
val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart + 1
cur.timestamp >= windowStart
}

/**
Expand All @@ -390,7 +397,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
* @param curWindowStart start time of the current window
*/
private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = {
(!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart
val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart - 1
(!windowQueue.isEmpty) && windowQueue.head.timestamp < windowStart
}
}

Expand Down
37 changes: 29 additions & 8 deletions query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import spire.syntax.cfor._
import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow}
import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader}
import filodb.memory.format.BinaryVector.BinaryVectorPtr
import filodb.query.exec.FiloQueryConfig

object RateFunctions {

Expand Down Expand Up @@ -188,8 +189,12 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
if (highestTime > lowestTime) {
// NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive"
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
val result = RateFunctions.extrapolatedRate(
windowStart, windowEnd, numSamples,
curWindowStart, windowEnd, numSamples,
lowestTime, lowestValue,
highestTime, highestValue,
isCounter, isRate)
Expand Down Expand Up @@ -284,9 +289,15 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
// TODO: handle case where schemas are different and we need to interpolate schemas
if (highestValue.buckets == lowestValue.buckets) {
val rateArray = new Array[Double](lowestValue.numBuckets)

val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1

cforRange { 0 until rateArray.size } { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
windowStart, windowEnd, numSamples,
curWindowStart, windowEnd, numSamples,
lowestTime, lowestValue.bucketValue(b),
highestTime, highestValue.bucketValue(b),
isCounter, isRate)
Expand Down Expand Up @@ -329,8 +340,14 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction {
endRowNum: Int): Unit =
sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000)
}

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}

Expand All @@ -346,8 +363,13 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction {
endRowNum: Int): Unit =
sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000)
}

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}
Expand Down Expand Up @@ -376,5 +398,4 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
hFunc.addTimeChunks(vectAcc, vectPtr, reader, startRowNum, endRowNum)

def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ???
}

}

0 comments on commit 31cc8f3

Please sign in to comment.