Skip to content

Commit

Permalink
add SumAndMinMaxOverTimeH range function
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Nov 21, 2024
1 parent 12165e7 commit e0a8788
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ object ProtoConverters {
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1017,6 +1018,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ enum InternalRangeFunction {
ABSENT_OVER_TIME = 26;
PRESENT_OVER_TIME = 27;
MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28;
RATE_AND_MIN_MAX_OVER_TIME = 29;
}

enum SortFunctionId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import enumeratum.EnumEntry

import filodb.query.RangeFunctionId

// scalastyle:off number.of.types
// Used for internal representations of RangeFunctions
sealed abstract class InternalRangeFunction(val onCumulCounter: Boolean = false) extends EnumEntry

Expand Down Expand Up @@ -55,6 +56,7 @@ object InternalRangeFunction {

// Used only for histogram schemas with max column
case object SumAndMaxOverTime extends InternalRangeFunction
case object RateAndMinMaxOverTime extends InternalRangeFunction
case object LastSampleHistMaxMin extends InternalRangeFunction

case object Timestamp extends InternalRangeFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext,
// This code is responsible for putting exact IDs needed by any range functions.
val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers)

val colIDs = isMaxMinColumnsEnabled(maxMinTenantFilter) match {
case true => addIDsForHistMaxMin(sch, colIDs1)
case _ => colIDs1
}
val colIDs = if (sch.data.columns.exists(_.name == "min") &&
sch.data.columns.exists(_.name == "max") &&
isMaxMinColumnsEnabled(maxMinTenantFilter)) addIDsForHistMaxMin(sch, colIDs1)
else colIDs1

// Modify transformers as needed for histogram w/ max, downsample, other schemas
val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,50 @@ class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[Transi
}
}

class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] {
private val hFunc = new RateOverDeltaChunkedFunctionH
private val maxFunc = new MaxOverTimeChunkedFunctionD
private val minFunc = new MinOverTimeChunkedFunctionD

override final def reset(): Unit = {
hFunc.reset()
maxFunc.reset()
minFunc.reset()
}
final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = {
sampleToEmit.setValues(endTimestamp, hFunc.h)
sampleToEmit.setDouble(2, maxFunc.max)
sampleToEmit.setDouble(3, minFunc.min)
}

import BinaryVector.BinaryVectorPtr

// scalastyle:off parameter.number
def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader,
valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader,
startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = {
// Do BinarySearch for start/end pos only once for both columns == WIN!
val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff
val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1)

// At least one sample is present
if (startRowNum <= endRowNum) {
hFunc.addTimeChunks(valueVectorAcc, valueVector, valueReader, startRowNum, endRowNum)

// Get valueVector/reader for max column
val maxVectAcc = info.vectorAccessor(maxColId)
val maxVectPtr = info.vectorAddress(maxColId)
maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum)

// Get valueVector/reader for max column
val minVectAcc = info.vectorAccessor(minColId)
val minVectPtr = info.vectorAddress(minColId)
minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum)
}
}
}


/**
* Computes Average Over Time using sum and count columns.
* Used in when calculating avg_over_time using downsampled data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ object RangeFunction {
f match {
case None => Some(LastSampleHistMaxMin)
case Some(SumOverTime) => Some(SumAndMaxOverTime)
case Some(Rate) => Some(RateAndMinMaxOverTime)
case other => other
}

Expand All @@ -383,6 +384,8 @@ object RangeFunction {
() => new LastSampleChunkedFunctionHMax(schema.colIDs(2), schema.colIDs(3))
case Some(SumAndMaxOverTime) => require(schema.columns(2).name == "max")
() => new SumAndMaxOverTimeFuncHD(schema.colIDs(2))
case Some(RateAndMinMaxOverTime) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min")
() => new RateAndMinMaxOverTimeFuncHD(schema.colIDs(2), schema.colIDs(3))
case Some(Last) => () => new LastSampleChunkedFunctionH
case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH
case Some(Rate) if schema.columns(1).isCumulative
Expand Down

0 comments on commit e0a8788

Please sign in to comment.