diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index 32355df907..d182e01778 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -980,6 +980,7 @@ object ProtoConverters { case InternalRangeFunction.AvgWithSumAndCountOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME + case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME @@ -1017,6 +1018,7 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME => InternalRangeFunction.AvgWithSumAndCountOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime + case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index ea58539491..499c887f4d 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -641,6 +641,7 @@ enum InternalRangeFunction { ABSENT_OVER_TIME = 26; PRESENT_OVER_TIME = 27; MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28; + RATE_AND_MIN_MAX_OVER_TIME = 29; } enum SortFunctionId { diff --git a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala index d622d60f1a..210c88bb30 100644 --- a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala @@ -4,6 +4,7 @@ import enumeratum.EnumEntry import filodb.query.RangeFunctionId +// scalastyle:off number.of.types // Used for internal representations of RangeFunctions sealed abstract class InternalRangeFunction(val onCumulCounter: Boolean = false) extends EnumEntry @@ -55,6 +56,7 @@ object InternalRangeFunction { // Used only for histogram schemas with max column case object SumAndMaxOverTime extends InternalRangeFunction + case object RateAndMinMaxOverTime extends InternalRangeFunction case object LastSampleHistMaxMin extends InternalRangeFunction case object Timestamp extends InternalRangeFunction diff --git a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala index 8e8f5b4412..155c337d55 100644 --- a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala +++ b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala @@ -128,10 +128,10 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext, // This code is responsible for putting exact IDs needed by any range functions. val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers) - val colIDs = isMaxMinColumnsEnabled(maxMinTenantFilter) match { - case true => addIDsForHistMaxMin(sch, colIDs1) - case _ => colIDs1 - } + val colIDs = if (sch.data.columns.exists(_.name == "min") && + sch.data.columns.exists(_.name == "max") && + isMaxMinColumnsEnabled(maxMinTenantFilter)) addIDsForHistMaxMin(sch, colIDs1) + else colIDs1 // Modify transformers as needed for histogram w/ max, downsample, other schemas val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index b434484aa6..1dd0ad02bf 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -366,6 +366,50 @@ class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[Transi } } +class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] { + private val hFunc = new RateOverDeltaChunkedFunctionH + private val maxFunc = new MaxOverTimeChunkedFunctionD + private val minFunc = new MinOverTimeChunkedFunctionD + + override final def reset(): Unit = { + hFunc.reset() + maxFunc.reset() + minFunc.reset() + } + final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = { + sampleToEmit.setValues(endTimestamp, hFunc.h) + sampleToEmit.setDouble(2, maxFunc.max) + sampleToEmit.setDouble(3, minFunc.min) + } + + import BinaryVector.BinaryVectorPtr + + // scalastyle:off parameter.number + def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader, + valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, + startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { + // Do BinarySearch for start/end pos only once for both columns == WIN! + val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + + // At least one sample is present + if (startRowNum <= endRowNum) { + hFunc.addTimeChunks(valueVectorAcc, valueVector, valueReader, startRowNum, endRowNum) + + // Get valueVector/reader for max column + val maxVectAcc = info.vectorAccessor(maxColId) + val maxVectPtr = info.vectorAddress(maxColId) + maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum) + + // Get valueVector/reader for max column + val minVectAcc = info.vectorAccessor(minColId) + val minVectPtr = info.vectorAddress(minColId) + minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum) + } + } +} + + /** * Computes Average Over Time using sum and count columns. * Used in when calculating avg_over_time using downsampled data diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala index aecf277106..e63ceb1210 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala @@ -372,6 +372,7 @@ object RangeFunction { f match { case None => Some(LastSampleHistMaxMin) case Some(SumOverTime) => Some(SumAndMaxOverTime) + case Some(Rate) => Some(RateAndMinMaxOverTime) case other => other } @@ -383,6 +384,8 @@ object RangeFunction { () => new LastSampleChunkedFunctionHMax(schema.colIDs(2), schema.colIDs(3)) case Some(SumAndMaxOverTime) => require(schema.columns(2).name == "max") () => new SumAndMaxOverTimeFuncHD(schema.colIDs(2)) + case Some(RateAndMinMaxOverTime) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min") + () => new RateAndMinMaxOverTimeFuncHD(schema.colIDs(2), schema.colIDs(3)) case Some(Last) => () => new LastSampleChunkedFunctionH case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH case Some(Rate) if schema.columns(1).isCumulative