diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 0396db93b6..8553e0025e 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -318,6 +318,9 @@ filodb { query { + # feature flag to enable start time inclusive in window lookback + inclusive-range = true + # feature flag for query result streaming streaming-query-results-enabled = false diff --git a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala index 9154dc5eb7..96329750f1 100644 --- a/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala +++ b/core/src/main/scala/filodb.core/store/ChunkSetInfo.scala @@ -435,7 +435,8 @@ class WindowedChunkIterator(rv: RawDataRangeVector, start: Long, step: Long, end var curWindowEnd: Long = -1L, var curWindowStart: Long = -1L, private var readIndex: Int = 0, - windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader]) + windowInfos: Buffer[ChunkSetInfoReader] = Buffer.empty[ChunkSetInfoReader], + isInclusiveRange: Boolean = true) extends Iterator[ChunkSetInfoReader] { require(step > 0, s"Adjusted step $step not > 0") private val infos = rv.chunkInfos(start - window, end) @@ -455,7 +456,8 @@ extends Iterator[ChunkSetInfoReader] { // advance window pointers and reset read index if (curWindowEnd == -1L) { curWindowEnd = start - curWindowStart = start - Math.max(window, 0) // window cannot be below 0, ie start should never be > end + val windowDuration = if (isInclusiveRange) window else window - 1 + curWindowStart = start - Math.max(windowDuration, 0) // window cannot be below 0, ie start should never be > end } else { curWindowEnd += step curWindowStart += step diff --git a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala index 9fcd50d05d..34ef0037ac 100644 --- a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala +++ b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala @@ -4,6 +4,7 @@ import com.typesafe.scalalogging.StrictLogging import monix.reactive.Observable import org.jctools.queues.SpscUnboundedArrayQueue +import filodb.core.GlobalConfig.systemConfig import filodb.core.metadata.Column.ColumnType import filodb.core.query._ import filodb.core.store.WindowedChunkIterator @@ -196,6 +197,10 @@ final case class PeriodicSamplesMapper(startMs: Long, } } +object FiloQueryConfig { + val isInclusiveRange = systemConfig.getBoolean("filodb.query.inclusive-range") +} + /** * A low-overhead iterator which works on one window at a time, optimally applying columnar techniques * to compute each window as fast as possible on multiple rows at a time. @@ -213,7 +218,8 @@ extends WrappedCursor(rv.rows()) with StrictLogging { // Lazily open the iterator and obtain the lock. This allows one thread to create the // iterator, but the lock is owned by the thread actually performing the iteration. private lazy val windowIt = { - val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext) + val it = new WindowedChunkIterator(rv, start, step, end, window, querySession.qContext, + isInclusiveRange = FiloQueryConfig.isInclusiveRange) // Need to hold the shared lock explicitly, because the window iterator needs to // pre-fetch chunks to determine the window. This pre-fetching can force the internal // iterator to close, which would release the lock too soon. @@ -378,7 +384,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor, */ private def shouldAddCurToWindow(curWindowStart: Long, cur: TransientRow): Boolean = { // cur is inside current window - cur.timestamp >= curWindowStart + val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart + 1 + cur.timestamp >= windowStart } /** @@ -390,7 +397,8 @@ class SlidingWindowIterator(raw: RangeVectorCursor, * @param curWindowStart start time of the current window */ private def shouldRemoveWindowHead(curWindowStart: Long): Boolean = { - (!windowQueue.isEmpty) && windowQueue.head.timestamp < curWindowStart + val windowStart = if (FiloQueryConfig.isInclusiveRange) curWindowStart else curWindowStart - 1 + (!windowQueue.isEmpty) && windowQueue.head.timestamp < windowStart } } diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index ed72c17f29..7027d5a43c 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -5,6 +5,7 @@ import spire.syntax.cfor._ import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow} import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader} import filodb.memory.format.BinaryVector.BinaryVectorPtr +import filodb.query.exec.FiloQueryConfig object RateFunctions { @@ -188,8 +189,12 @@ abstract class ChunkedRateFunctionBase extends CounterChunkedRangeFunction[Trans override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = { if (highestTime > lowestTime) { // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" + val curWindowStart = if (FiloQueryConfig.isInclusiveRange) + windowStart + else + windowStart - 1 val result = RateFunctions.extrapolatedRate( - windowStart, windowEnd, numSamples, + curWindowStart, windowEnd, numSamples, lowestTime, lowestValue, highestTime, highestValue, isCounter, isRate) @@ -284,9 +289,15 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra // TODO: handle case where schemas are different and we need to interpolate schemas if (highestValue.buckets == lowestValue.buckets) { val rateArray = new Array[Double](lowestValue.numBuckets) + + val curWindowStart = if (FiloQueryConfig.isInclusiveRange) + windowStart + else + windowStart - 1 + cforRange { 0 until rateArray.size } { b => rateArray(b) = RateFunctions.extrapolatedRate( - windowStart, windowEnd, numSamples, + curWindowStart, windowEnd, numSamples, lowestTime, lowestValue.bucketValue(b), highestTime, highestValue.bucketValue(b), isCounter, isRate) @@ -329,8 +340,14 @@ class RateOverDeltaChunkedFunctionD extends ChunkedDoubleRangeFunction { endRowNum: Int): Unit = sumFunc.addTimeDoubleChunks(doubleVectAcc, doubleVect, doubleReader, startRowNum, endRowNum) - override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = { + val curWindowStart = if (FiloQueryConfig.isInclusiveRange) + windowStart + else + windowStart - 1 + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000) + } + override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -346,8 +363,13 @@ class RateOverDeltaChunkedFunctionL extends ChunkedLongRangeFunction { endRowNum: Int): Unit = sumFunc.addTimeChunks(longVectAcc, longVect, longReader, startRowNum, endRowNum) - override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = - sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (windowStart)) * 1000) + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientRow): Unit = { + val curWindowStart = if (FiloQueryConfig.isInclusiveRange) + windowStart + else + windowStart - 1 + sampleToEmit.setValues(windowEnd, sumFunc.sum / (windowEnd - (curWindowStart)) * 1000) + } override def apply(endTimestamp: Long, sampleToEmit: TransientRow): Unit = ??? } @@ -376,5 +398,4 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em hFunc.addTimeChunks(vectAcc, vectPtr, reader, startRowNum, endRowNum) def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ??? -} - +} \ No newline at end of file