Skip to content

Commit

Permalink
add SumAndMinMaxOverTimeH range function
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Nov 21, 2024
1 parent 12165e7 commit a3861f4
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ object ProtoConverters {
case InternalRangeFunction.LastOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_OVER_TIME
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.SumAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_MIN_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1016,7 +1016,8 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_OVER_TIME => InternalRangeFunction.LastOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => ??? // deprecated
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_MIN_OVER_TIME => InternalRangeFunction.SumAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
3 changes: 2 additions & 1 deletion grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,13 @@ enum InternalRangeFunction {
LAST = 20;
LAST_OVER_TIME = 21;
AVG_WITH_SUM_AND_COUNT_OVER_TIME = 22;
SUM_AND_MAX_OVER_TIME = 23;
SUM_AND_MAX_OVER_TIME = 23; // deprecated
LAST_SAMPLE_HIST_MAX_MIN = 24;
TIME_STAMP = 25;
ABSENT_OVER_TIME = 26;
PRESENT_OVER_TIME = 27;
MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28;
SUM_AND_MAX_MIN_OVER_TIME = 29;
}

enum SortFunctionId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ object InternalRangeFunction {
// Used only for ds-gauge schema
case object AvgWithSumAndCountOverTime extends InternalRangeFunction

// Used only for histogram schemas with max column
case object SumAndMaxOverTime extends InternalRangeFunction
// Used only for histogram schemas with min/max column
case object SumAndMinMaxOverTime extends InternalRangeFunction
case object LastSampleHistMaxMin extends InternalRangeFunction

case object Timestamp extends InternalRangeFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext,
// This code is responsible for putting exact IDs needed by any range functions.
val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers)

val colIDs = isMaxMinColumnsEnabled(maxMinTenantFilter) match {
case true => addIDsForHistMaxMin(sch, colIDs1)
case _ => colIDs1
}
val colIDs = if (sch.data.columns.exists(_.name == "min") &&
sch.data.columns.exists(_.name == "max") &&
isMaxMinColumnsEnabled(maxMinTenantFilter)) addIDsForHistMaxMin(sch, colIDs1)
else colIDs1

// Modify transformers as needed for histogram w/ max, downsample, other schemas
val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,20 @@ extends TimeRangeFunction[TransientHistRow] {
* Sums Histograms over time and also computes Max over time of a Max field.
* @param maxColID the data column ID containing the max column
*/
class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] {
class SumAndMinMaxOverTimeFuncHD(maxColID: Int, minColId: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] {
private val hFunc = new SumOverTimeChunkedFunctionH
private val maxFunc = new MaxOverTimeChunkedFunctionD
private val minFunc = new MinOverTimeChunkedFunctionD

override final def reset(): Unit = {
hFunc.reset()
maxFunc.reset()
minFunc.reset()
}
final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = {
sampleToEmit.setValues(endTimestamp, hFunc.h)
sampleToEmit.setDouble(2, maxFunc.max)
sampleToEmit.setDouble(3, minFunc.min)
}

import BinaryVector.BinaryVectorPtr
Expand All @@ -362,6 +365,12 @@ class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[Transi
val maxVectAcc = info.vectorAccessor(maxColID)
val maxVectPtr = info.vectorAddress(maxColID)
maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum)

// Get valueVector/reader for max column
val minVectAcc = info.vectorAccessor(minColId)
val minVectPtr = info.vectorAddress(minColId)
minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum)

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ object RangeFunction {
def histMaxMinRangeFunction(f: Option[InternalRangeFunction]): Option[InternalRangeFunction] =
f match {
case None => Some(LastSampleHistMaxMin)
case Some(SumOverTime) => Some(SumAndMaxOverTime)
case Some(SumOverTime) => Some(SumAndMinMaxOverTime)
case other => other
}

Expand All @@ -381,8 +381,8 @@ object RangeFunction {
case None => () => new LastSampleChunkedFunctionH
case Some(LastSampleHistMaxMin) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min")
() => new LastSampleChunkedFunctionHMax(schema.colIDs(2), schema.colIDs(3))
case Some(SumAndMaxOverTime) => require(schema.columns(2).name == "max")
() => new SumAndMaxOverTimeFuncHD(schema.colIDs(2))
case Some(SumAndMinMaxOverTime) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min")
() => new SumAndMinMaxOverTimeFuncHD(schema.colIDs(2), schema.colIDs(3))
case Some(Last) => () => new LastSampleChunkedFunctionH
case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH
case Some(Rate) if schema.columns(1).isCumulative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec {
info(s"iteration $x windowSize=$windowSize step=$step")

val row = new TransientHistMaxMinRow()
val chunkedIt = chunkedWindowItHist(data, rv, new SumAndMaxOverTimeFuncHD(5), windowSize, step, row)
val chunkedIt = chunkedWindowItHist(data, rv, new SumAndMinMaxOverTimeFuncHD(5, 4), windowSize, step, row)
chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) =>
val aggHist = aggRow.getHistogram(1)
val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram])
Expand Down

0 comments on commit a3861f4

Please sign in to comment.