Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding IRate support for Histograms #1551

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,20 @@ trait CounterChunkedRangeFunction[R <: MutableRowReader] extends ChunkedRangeFun
// reset is called before first chunk. Reset correction metadata
override def reset(): Unit = { correctionMeta = NoCorrection }

def calculateStartAndEndRowNum(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr,
tsReader: bv.LongVectorDataReader, startTime: Long, endTime: Long,
info: ChunkSetInfoReader): (Int, Int) = {
val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff
val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1)
(startRowNum, endRowNum)
}

// scalastyle:off parameter.number
def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader,
valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader,
startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = {
val ccReader = valueReader.asInstanceOf[CounterVectorReader]
val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff
val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1)
val (startRowNum, endRowNum) = calculateStartAndEndRowNum(tsVectorAcc, tsVector, tsReader, startTime, endTime, info)

// For each chunk:
// Check if any dropoff from end of last chunk to beg of this chunk (unless it's the first chunk)
Expand Down Expand Up @@ -384,6 +391,8 @@ object RangeFunction {
case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH
case Some(Rate) if schema.columns(1).isCumulative
=> () => new HistRateFunction
case Some(Irate) if schema.columns(1).isCumulative
=> () => new HistIRateFunction
case Some(Increase) if schema.columns(1).isCumulative
=> () => new HistIncreaseFunction
case Some(Rate) => () => new RateOverDeltaChunkedFunctionH
Expand Down
157 changes: 157 additions & 0 deletions query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package filodb.query.exec.rangefn
import spire.syntax.cfor._

import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow}
import filodb.core.store.ChunkSetInfoReader
import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader}
import filodb.memory.format.BinaryVector.BinaryVectorPtr
import filodb.memory.format.vectors.LongVectorDataReader

object RateFunctions {

Expand Down Expand Up @@ -303,6 +305,89 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra
def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ???
}

abstract class HistogramIRateFunctionBase extends CounterChunkedRangeFunction[TransientHistRow]{
var numSamples = 0
var previousToLastSampleTime = Long.MaxValue
var previousToLastSampleValue: bv.HistogramWithBuckets = bv.HistogramWithBuckets.empty
var lastSampleTime = 0L
var lastSampleValue: bv.HistogramWithBuckets = bv.HistogramWithBuckets.empty

def isCounter: Boolean

def isRate: Boolean

override def reset(): Unit = {
previousToLastSampleTime = Long.MaxValue
previousToLastSampleValue = bv.HistogramWithBuckets.empty
lastSampleTime = 0L
lastSampleValue = bv.HistogramWithBuckets.empty
super.reset()
}

override def calculateStartAndEndRowNum(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr,
tsReader: bv.LongVectorDataReader, startTime: Long, endTime: Long,
info: ChunkSetInfoReader): (Int, Int) = {
val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1)
// In Irate, we use the last two samples
(endRowNum-1, endRowNum)
}

def addTimeChunks(acc: MemoryReader, vector: BinaryVectorPtr, reader: CounterVectorReader,
previousToEndRowNum: Int, endRowNum: Int,
previousToEndTime: Long, endTime: Long): Unit = reader match {
case histReader: bv.CounterHistogramReader =>

// For IRate, we need the last 2 samples in the given window
// Hence, we are using the endtime to see if there is any newer sample
if (endTime > lastSampleTime) {

// lowest rowNum possible in chunk is 0
if (previousToEndRowNum >= 0){
previousToLastSampleTime = previousToEndTime
previousToLastSampleValue = histReader.correctedValue(previousToEndRowNum, correctionMeta)
}

lastSampleTime = endTime
lastSampleValue = histReader.correctedValue(endRowNum, correctionMeta)
}
case other: CounterVectorReader =>
}

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = {

// previousToLastSampleTime is set to Long.Max and lastSampleTime = 0, by default. Following check
// helps to determine if we have at least two samples to calculate Irate
if (lastSampleTime > previousToLastSampleTime) {

// TODO: handle case where schemas are different and we need to interpolate schemas
if (previousToLastSampleValue.buckets == lastSampleValue.buckets) {
val rateArray = new Array[Double](previousToLastSampleValue.numBuckets)
cforRange {
0 until rateArray.size
} { b =>
rateArray(b) = RateFunctions.extrapolatedRate(
previousToLastSampleTime, lastSampleTime, 2,
previousToLastSampleTime, previousToLastSampleValue.bucketValue(b),
lastSampleTime, lastSampleValue.bucketValue(b),
isCounter, isRate)
}
sampleToEmit.setValues(windowEnd, bv.MutableHistogram(previousToLastSampleValue.buckets, rateArray))
} else {
sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty)
}
} else {
sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty)
}
}

def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ???
}

class HistIRateFunction extends HistogramIRateFunctionBase {
def isCounter: Boolean = true
def isRate: Boolean = true
}

class HistRateFunction extends HistogramRateFunctionBase {
def isCounter: Boolean = true
def isRate: Boolean = true
Expand Down Expand Up @@ -378,3 +463,75 @@ class RateOverDeltaChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.em
def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = ???
}

class IRateOverDeltaChunkedFunctionH(var lastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty,
var previousToLastSampleValue: bv.HistogramWithBuckets = bv.Histogram.empty,
var previousToLastSampleTime: Long = Long.MaxValue,
var lastSampleTime: Long = 0L)
extends ChunkedRangeFunction[TransientHistRow] {

override def reset(): Unit = {
previousToLastSampleTime = Long.MaxValue
previousToLastSampleValue = bv.HistogramWithBuckets.empty
lastSampleTime = 0L
lastSampleValue = bv.HistogramWithBuckets.empty
super.reset()
}

override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistRow): Unit = {

if (lastSampleTime > previousToLastSampleTime){
val irateArray = new Array[Double](lastSampleValue.numBuckets)
cforRange {
0 until irateArray.size
} { b =>
val sumOfLastTwoSamples = (lastSampleValue.bucketValue(b) + previousToLastSampleValue.bucketValue(b))
irateArray(b) = sumOfLastTwoSamples / (lastSampleTime - previousToLastSampleTime) * 1000
}
sampleToEmit.setValues(windowEnd, bv.MutableHistogram(lastSampleValue.buckets, irateArray))
}
else{
sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty)
}
}

// scalastyle:off parameter.number
override def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: LongVectorDataReader,
valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader,
startTime: BinaryVectorPtr, endTime: BinaryVectorPtr,
info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = {

val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1)
val previousToEndRowNum = endRowNum - 1

// At least one sample is present
if (endRowNum >= 0) {
addTimeChunks(valueVectorAcc, valueVector, valueReader, previousToEndRowNum, endRowNum,
tsReader(tsVectorAcc, tsVector, previousToEndRowNum), tsReader(tsVectorAcc, tsVector, endRowNum))
}
}

def addTimeChunks(acc: MemoryReader, vector: BinaryVectorPtr, reader: VectorDataReader,
previousToEndRowNum: Int, endRowNum: Int,
previousToEndTime: Long, endTime: Long): Unit = {

// we are tracking the last two sample's histogram buckets for irate
if (endTime > lastSampleTime){
// track sample's time value to calculate the irate between these two times
lastSampleTime = endTime
lastSampleValue = reader.asHistReader.apply(endRowNum)

if (previousToEndRowNum >= 0){
previousToLastSampleTime = previousToEndTime
previousToLastSampleValue = reader.asHistReader.apply(previousToEndRowNum)
}
}
}

/**
* Return the computed result in the sampleToEmit
*
* @param endTimestamp the ending timestamp of the current window
*/
def apply(endTimestamp: BinaryVectorPtr, sampleToEmit: TransientHistRow): Unit = ???

}