From ef82299de3011d64b2b707995e150ac0f815093a Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Wed, 5 Apr 2023 15:31:16 -0700 Subject: [PATCH 1/4] Adding IRate support for Histograms --- .../query/exec/rangefn/RateFunctions.scala | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 57cc2023f9..a503b23bf2 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -303,6 +303,70 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ??? } +abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[TransientHistRow]{ + var numSamples = 0 + var previousToLastSampleTime = Long.MaxValue + var previousToLastSampleValue: bv.HistogramWithBuckets = bv.HistogramWithBuckets.empty + var lastSampleTime = 0L + var lastSampleValue: bv.HistogramWithBuckets = bv.HistogramWithBuckets.empty + + def isCounter: Boolean + + def isRate: Boolean + + override def reset(): Unit = { + previousToLastSampleTime = 0L + previousToLastSampleValue = bv.HistogramWithBuckets.empty + lastSampleTime = 0L + lastSampleValue = bv.HistogramWithBuckets.empty + super.reset() + } + + def addTimeChunks(acc: MemoryReader, vector: BinaryVectorPtr, reader: CounterVectorReader, + previousToEndRowNum: Int, endRowNum: Int, + previousToEndTime: Long, endTime: Long): Unit = reader match { + case histReader: bv.CounterHistogramReader => + + // For IRate, we need the last 2 samples in the given window + // Hence, we are using the endtime to see if there is any newer sample + if (endTime > lastSampleTime) { + + previousToLastSampleTime = previousToEndTime + previousToLastSampleValue = histReader.correctedValue(previousToEndRowNum, correctionMeta) + + lastSampleTime = endTime + lastSampleValue = histReader.correctedValue(endRowNum, correctionMeta) + } + case other: CounterVectorReader => + } + + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = { + if (previousToLastSampleTime > lastSampleTime) { + // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" + // TODO: handle case where schemas are different and we need to interpolate schemas + if (previousToLastSampleValue.buckets == lastSampleValue.buckets) { + val rateArray = new Array[Double](previousToLastSampleValue.numBuckets) + cforRange { + 0 until rateArray.size + } { b => + rateArray(b) = RateFunctions.extrapolatedRate( + windowStart - 1, windowEnd, 2, + previousToLastSampleTime, previousToLastSampleValue.bucketValue(b), + lastSampleTime, lastSampleValue.bucketValue(b), + isCounter, isRate) + } + sampleToEmit.setValues(windowEnd, bv.MutableHistogram(previousToLastSampleValue.buckets, rateArray)) + } else { + sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty) + } + } else { + sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty) + } + } + + def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ??? +} + class HistRateFunction extends HistogramRateFunctionBase { def isCounter: Boolean = true def isRate: Boolean = true From 5de20a7f7ff3631fe5b7f724b3921bc4cb74d06f Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Wed, 5 Apr 2023 16:17:47 -0700 Subject: [PATCH 2/4] Adding support for irate for histograms --- .../query/exec/rangefn/RangeFunction.scala | 13 +++++++-- .../query/exec/rangefn/RateFunctions.scala | 28 +++++++++++++++---- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala index fcd4b0095a..28d5dfd7b8 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala @@ -124,13 +124,20 @@ trait CounterChunkedRangeFunction[R <: MutableRowReader] extends ChunkedRangeFun // reset is called before first chunk. Reset correction metadata override def reset(): Unit = { correctionMeta = NoCorrection } + def calculateStartAndEndRowNum(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, + tsReader: bv.LongVectorDataReader, startTime: Long, endTime: Long, + info: ChunkSetInfoReader): (Int, Int) = { + val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + (startRowNum, endRowNum) + } + // scalastyle:off parameter.number def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader, valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { val ccReader = valueReader.asInstanceOf[CounterVectorReader] - val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff - val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + val (startRowNum, endRowNum) = calculateStartAndEndRowNum(tsVectorAcc, tsVector, tsReader, startTime, endTime, info) // For each chunk: // Check if any dropoff from end of last chunk to beg of this chunk (unless it's the first chunk) @@ -384,6 +391,8 @@ object RangeFunction { case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH case Some(Rate) if schema.columns(1).isCumulative => () => new HistRateFunction + case Some(Irate) if schema.columns(1).isCumulative + => () => new HistIRateFunction case Some(Increase) if schema.columns(1).isCumulative => () => new HistIncreaseFunction case Some(Rate) => () => new RateOverDeltaChunkedFunctionH diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index a503b23bf2..953346e77b 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -1,9 +1,9 @@ package filodb.query.exec.rangefn import spire.syntax.cfor._ - import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow} -import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader} +import filodb.core.store.ChunkSetInfoReader +import filodb.memory.format.{BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader, vectors => bv} import filodb.memory.format.BinaryVector.BinaryVectorPtr object RateFunctions { @@ -322,6 +322,14 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr super.reset() } + override def calculateStartAndEndRowNum(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, + tsReader: bv.LongVectorDataReader, startTime: Long, endTime: Long, + info: ChunkSetInfoReader): (Int, Int) = { + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + // In Irate, we use the last two samples + (endRowNum-1, endRowNum) + } + def addTimeChunks(acc: MemoryReader, vector: BinaryVectorPtr, reader: CounterVectorReader, previousToEndRowNum: Int, endRowNum: Int, previousToEndTime: Long, endTime: Long): Unit = reader match { @@ -331,8 +339,11 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr // Hence, we are using the endtime to see if there is any newer sample if (endTime > lastSampleTime) { - previousToLastSampleTime = previousToEndTime - previousToLastSampleValue = histReader.correctedValue(previousToEndRowNum, correctionMeta) + // TODO: Check what is the lowest endRowNum possible, assuming 0 for now + if (previousToEndRowNum > -1){ + previousToLastSampleTime = previousToEndTime + previousToLastSampleValue = histReader.correctedValue(previousToEndRowNum, correctionMeta) + } lastSampleTime = endTime lastSampleValue = histReader.correctedValue(endRowNum, correctionMeta) @@ -341,7 +352,9 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr } override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = { - if (previousToLastSampleTime > lastSampleTime) { + + // check if previousToLastSampleTime is init and if it is < the lastSampleTime + if ( (previousToLastSampleTime > 0L) && (previousToLastSampleTime < lastSampleTime) ) { // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" // TODO: handle case where schemas are different and we need to interpolate schemas if (previousToLastSampleValue.buckets == lastSampleValue.buckets) { @@ -367,6 +380,11 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ??? } +class HistIRateFunction extends HistogramIRateFunctionBase { + def isCounter: Boolean = true + def isRate: Boolean = true +} + class HistRateFunction extends HistogramRateFunctionBase { def isCounter: Boolean = true def isRate: Boolean = true From 205967b8edff4e0a3a1c4517f48fcda61511eec8 Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Thu, 6 Apr 2023 17:04:36 -0700 Subject: [PATCH 3/4] Adding IRate delta implementation --- .../query/exec/rangefn/RateFunctions.scala | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 953346e77b..3e1f0bef13 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -1,10 +1,12 @@ package filodb.query.exec.rangefn import spire.syntax.cfor._ + import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow} import filodb.core.store.ChunkSetInfoReader -import filodb.memory.format.{BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader, vectors => bv} +import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader} import filodb.memory.format.BinaryVector.BinaryVectorPtr +import filodb.memory.format.vectors.LongVectorDataReader object RateFunctions { @@ -460,3 +462,59 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ??? } +class IRateOverDeltaChunkedFunctionH(var lastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty, + var previousToLastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty, + var latestTime: Long = 0L) + extends ChunkedRangeFunction[TransientHistRow] { + + // TODO: Add all the necessary checks + // TODO: track last sample time and previous to last sample time + + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = { + // TODO: Add checks + val irateArray = new Array[Double](lastSampleValue.numBuckets) + cforRange { + 0 until irateArray.size + } { b => + val sumOfLastTwoSamples = (lastSampleValue.bucketValue(b) + previousToLastSampleValue.bucketValue(b)) + irateArray(b) = sumOfLastTwoSamples / (windowEnd - (windowStart - 1)) * 1000 + } + sampleToEmit.setValues(windowEnd, bv.MutableHistogram(lastSampleValue.buckets, irateArray)) + } + + // scalastyle:off parameter.number + override def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: LongVectorDataReader, + valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, + startTime: BinaryVectorPtr, endTime: BinaryVectorPtr, + info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { + + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + val previousToEndRowNum = endRowNum - 1 + + // At least one sample is present + if (endRowNum >= 0) { + addTimeChunks(valueVectorAcc, valueVector, valueReader, previousToEndRowNum, endRowNum, + tsReader(tsVectorAcc, tsVector, previousToEndRowNum), tsReader(tsVectorAcc, tsVector, endRowNum)) + } + } + + def addTimeChunks(acc: MemoryReader, vector: BinaryVectorPtr, reader: VectorDataReader, + previousToEndRowNum: Int, endRowNum: Int, + previousToEndTime: Long, endTime: Long): Unit = { + + // we are tracking the last two sample's histogram buckets for irate + if (endTime > latestTime){ + latestTime = endTime + lastSampleValue = reader.asHistReader.apply(endRowNum) + previousToLastSampleValue = reader.asHistReader.apply(previousToEndRowNum) + } + } + + /** + * Return the computed result in the sampleToEmit + * + * @param endTimestamp the ending timestamp of the current window + */ + def apply(endTimestamp: BinaryVectorPtr, sampleToEmit: TransientHistRow): Unit = ??? + +} From c44d9cdcd89ea1ccb15dba4dfc614c3d65a4814e Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Thu, 6 Apr 2023 17:52:22 -0700 Subject: [PATCH 4/4] Adding code for Irate delta-histogram --- .../query/exec/rangefn/RateFunctions.scala | 59 ++++++++++++------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 3e1f0bef13..ce1b2f2e9d 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -317,7 +317,7 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr def isRate: Boolean override def reset(): Unit = { - previousToLastSampleTime = 0L + previousToLastSampleTime = Long.MaxValue previousToLastSampleValue = bv.HistogramWithBuckets.empty lastSampleTime = 0L lastSampleValue = bv.HistogramWithBuckets.empty @@ -341,8 +341,8 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr // Hence, we are using the endtime to see if there is any newer sample if (endTime > lastSampleTime) { - // TODO: Check what is the lowest endRowNum possible, assuming 0 for now - if (previousToEndRowNum > -1){ + // lowest rowNum possible in chunk is 0 + if (previousToEndRowNum >= 0){ previousToLastSampleTime = previousToEndTime previousToLastSampleValue = histReader.correctedValue(previousToEndRowNum, correctionMeta) } @@ -355,9 +355,10 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = { - // check if previousToLastSampleTime is init and if it is < the lastSampleTime - if ( (previousToLastSampleTime > 0L) && (previousToLastSampleTime < lastSampleTime) ) { - // NOTE: It seems in order to match previous code, we have to adjust the windowStart by -1 so it's "inclusive" + // previousToLastSampleTime is set to Long.Max and lastSampleTime = 0, by default. Following check + // helps to determine if we have at least two samples to calculate Irate + if (lastSampleTime > previousToLastSampleTime) { + // TODO: handle case where schemas are different and we need to interpolate schemas if (previousToLastSampleValue.buckets == lastSampleValue.buckets) { val rateArray = new Array[Double](previousToLastSampleValue.numBuckets) @@ -365,7 +366,7 @@ abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[Tr 0 until rateArray.size } { b => rateArray(b) = RateFunctions.extrapolatedRate( - windowStart - 1, windowEnd, 2, + previousToLastSampleTime, lastSampleTime, 2, previousToLastSampleTime, previousToLastSampleValue.bucketValue(b), lastSampleTime, lastSampleValue.bucketValue(b), isCounter, isRate) @@ -464,22 +465,33 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em class IRateOverDeltaChunkedFunctionH(var lastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty, var previousToLastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty, - var latestTime: Long = 0L) + var previousToLastSampleTime: Long = Long.MaxValue, + var lastSampleTime: Long = 0L) extends ChunkedRangeFunction[TransientHistRow] { - // TODO: Add all the necessary checks - // TODO: track last sample time and previous to last sample time + override def reset(): Unit = { + previousToLastSampleTime = Long.MaxValue + previousToLastSampleValue = bv.HistogramWithBuckets.empty + lastSampleTime = 0L + lastSampleValue = bv.HistogramWithBuckets.empty + super.reset() + } override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = { - // TODO: Add checks - val irateArray = new Array[Double](lastSampleValue.numBuckets) - cforRange { - 0 until irateArray.size - } { b => - val sumOfLastTwoSamples = (lastSampleValue.bucketValue(b) + previousToLastSampleValue.bucketValue(b)) - irateArray(b) = sumOfLastTwoSamples / (windowEnd - (windowStart - 1)) * 1000 + + if (lastSampleTime > previousToLastSampleTime){ + val irateArray = new Array[Double](lastSampleValue.numBuckets) + cforRange { + 0 until irateArray.size + } { b => + val sumOfLastTwoSamples = (lastSampleValue.bucketValue(b) + previousToLastSampleValue.bucketValue(b)) + irateArray(b) = sumOfLastTwoSamples / (lastSampleTime - previousToLastSampleTime) * 1000 + } + sampleToEmit.setValues(windowEnd, bv.MutableHistogram(lastSampleValue.buckets, irateArray)) + } + else{ + sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty) } - sampleToEmit.setValues(windowEnd, bv.MutableHistogram(lastSampleValue.buckets, irateArray)) } // scalastyle:off parameter.number @@ -503,10 +515,15 @@ class IRateOverDeltaChunkedFunctionH(var lastSampleValue: bv.HistogramWithBucket previousToEndTime: Long, endTime: Long): Unit = { // we are tracking the last two sample's histogram buckets for irate - if (endTime > latestTime){ - latestTime = endTime + if (endTime > lastSampleTime){ + // track sample's time value to calculate the irate between these two times + lastSampleTime = endTime lastSampleValue = reader.asHistReader.apply(endRowNum) - previousToLastSampleValue = reader.asHistReader.apply(previousToEndRowNum) + + if (previousToEndRowNum >= 0){ + previousToLastSampleTime = previousToEndTime + previousToLastSampleValue = reader.asHistReader.apply(previousToEndRowNum) + } } }