Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc(query): feature flag to exclude start time for range functions #1735

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ filodb {

query {

# feature flag to enable start time inclusive in window lookback
inclusive-range = true

# feature flag for query result streaming
streaming-query-results-enabled = false

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/filodb.core/store/ChunkSetInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ class WindowedChunkIterator(rv: RawDataRangeVector, start: Long, step: Long, end
var curWindowEnd: Long = -1L,
var curWindowStart: Long = -1L,
private var readIndex: Int = 0,
windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader])
windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader],
isInclusiveRange: Boolean = true)
extends Iterator[ChunkSetInfoReader] {
require(step > 0, s"Adjusted step $step not > 0")
private val infos = rv.chunkInfos(start - window, end)
Expand All @@ -455,7 +456,8 @@ extends Iterator[ChunkSetInfoReader] {
// advance window pointers and reset read index
if (curWindowEnd == -1L) {
curWindowEnd = start
curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end
val windowDuration = if (isInclusiveRange) window else window - 1
curWindowStart = start - Math.max(windowDuration, 0) // window cannot be below 0, ie start should never be > end
} else {
curWindowEnd += step
curWindowStart += step
Expand Down
14 changes: 11 additions & 3 deletions query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
import monix.reactive.Observable
import org.jctools.queues.SpscUnboundedArrayQueue

import filodb.core.GlobalConfig.systemConfig
import filodb.core.metadata.Column.ColumnType
import filodb.core.query._
import filodb.core.store.WindowedChunkIterator
Expand Down Expand Up @@ -196,6 +197,10 @@ final case class PeriodicSamplesMapper(startMs: Long,
}
}

object FiloQueryConfig {
val isInclusiveRange = systemConfig.getBoolean("filodb.query.inclusive-range")
}

/**
* A low-overhead iterator which works on one window at a time, optimally applying columnar techniques
* to compute each window as fast as possible on multiple rows at a time.
Expand All @@ -213,7 +218,8 @@ extends WrappedCursor(rv.rows()) with StrictLogging {
// Lazily open the iterator and obtain the lock. This allows one thread to create the
// iterator, but the lock is owned by the thread actually performing the iteration.
private lazy val windowIt = {
val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext)
val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext,
isInclusiveRange = FiloQueryConfig.isInclusiveRange)
// Need to hold the shared lock explicitly, because the window iterator needs to
// pre-fetch chunks to determine the window. This pre-fetching can force the internal
// iterator to close, which would release the lock too soon.
Expand Down Expand Up @@ -378,7 +384,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
*/
private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = {
// cur is inside current window
cur.timestamp >= curWindowStart
val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart + 1
cur.timestamp >= windowStart
sherali42 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -390,7 +397,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor,
* @param curWindowStart start time of the current window
*/
private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = {
(!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart
val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart - 1
sherali42 marked this conversation as resolved.
Show resolved Hide resolved
(!windowQueue.isEmpty) && windowQueue.head.timestamp < windowStart
}
}

Expand Down
37 changes: 29 additions & 8 deletions query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import spire.syntax.cfor._
import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow}
import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader}
import filodb.memory.format.BinaryVector.BinaryVectorPtr
import filodb.query.exec.FiloQueryConfig

object RateFunctions {

Expand Down Expand Up @@ -188,8 +189,12 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
if (highestTime > lowestTime) {
// NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive"
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
val result = RateFunctions.extrapolatedRate(
windowStart, windowEnd, numSamples,
curWindowStart, windowEnd, numSamples,
lowestTime, lowestValue,
highestTime, highestValue,
isCounter, isRate)
Expand Down Expand Up @@ -284,9 +289,15 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
// TODO: handle case where schemas are different and we need to interpolate schemas
if (highestValue.buckets == lowestValue.buckets) {
val rateArray = new Array[Double](lowestValue.numBuckets)

val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1

cforRange { 0 until rateArray.size } { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
windowStart, windowEnd, numSamples,
curWindowStart, windowEnd, numSamples,
lowestTime, lowestValue.bucketValue(b),
highestTime, highestValue.bucketValue(b),
isCounter, isRate)
Expand Down Expand Up @@ -329,8 +340,14 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction {
endRowNum: Int): Unit =
sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
sherali42 marked this conversation as resolved.
Show resolved Hide resolved
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000)
}

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}

Expand All @@ -346,8 +363,13 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction {
endRowNum: Int): Unit =
sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum)

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit =
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000)
override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = {
val curWindowStart = if (FiloQueryConfig.isInclusiveRange)
windowStart
else
windowStart - 1
sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000)
}

override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ???
}
Expand Down Expand Up @@ -376,5 +398,4 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
hFunc.addTimeChunks(vectAcc, vectPtr, reader, startRowNum, endRowNum)

def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ???
}

}
Loading