diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94a56d8269..e57b112663 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -567,30 +567,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv }} } - /** - * Throws a BadQueryException if any of the following conditions hold: - * (1) the plan spans more than one non-metric shard key prefix. - * (2) the plan contains at least one BinaryJoin, and any of its BinaryJoins contain an offset. - * @param splitLeafPlan must contain leaf plans that individually span multiple partitions. - */ - private def validateSplitLeafPlan(splitLeafPlan: LogicalPlan): Unit = { - val baseErrorMessage = "This query contains selectors that individually read data from multiple partitions. " + - "This is likely because a selector's data was migrated between partitions. " - if (hasBinaryJoin(splitLeafPlan) && getOffsetMillis(splitLeafPlan).exists(_ > 0)) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries cannot contain binary joins with offsets." - ) - } - lazy val hasMoreThanOneNonMetricShardKey = - LogicalPlanUtils.resolvePipeConcatenatedShardKeyFilters(splitLeafPlan, dataset.options.nonMetricShardColumns) - .filter(_.nonEmpty).distinct.size > 1 - if (hasMoreThanOneNonMetricShardKey) { - throw new BadQueryException( baseErrorMessage + - "These \"split\" queries are not supported if they contain multiple non-metric shard keys." - ) - } - } - /** * Materializes a LogicalPlan with leaves that individually span multiple partitions. * All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they @@ -601,9 +577,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv //scalastyle:off method.length private def materializeSplitLeafPlan(logicalPlan: LogicalPlan, qContext: QueryContext): PlanResult = { - // TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within - // the limits of max range of data exported - validateSplitLeafPlan(logicalPlan) val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] // get a mapping of assignments to time-ranges to query val lookbackMs = getLookBackMillis(logicalPlan).max diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index 32355df907..d182e01778 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -980,6 +980,7 @@ object ProtoConverters { case InternalRangeFunction.AvgWithSumAndCountOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME + case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME @@ -1017,6 +1018,7 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME => InternalRangeFunction.AvgWithSumAndCountOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime + case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 57897657c0..f40712935b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.BinaryOperator.{ADD, LAND} import filodb.query.InstantFunctionId.Ln -import filodb.query.{BadQueryException, LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} +import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} import filodb.query.exec._ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{ @@ -1712,18 +1712,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan2, expectedPlanWithRemoteExec1) - val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))" - val lp4 = Parser.queryRangeToLogicalPlan(query4, TimeStepParams(2000, stepSecs, 10000)) - - val promQlQueryParams4 = PromQlQueryParams(query4, 1000, 100, 10000) - intercept[BadQueryException] { - // Expecting to see Exception when we use BinaryJoin with offsets, technically this too should not be a big deal - // as we need to identify the right window, however this was not supported even before the change and it is ok to - // leave it unaddressed in the first phase as its just Binary joins with offsets - engine.materialize(lp4, QueryContext(origQueryParams = promQlQueryParams4, plannerParams = - PlannerParams(processMultiPartition = true))) - } - // Planner with period of uncertainty should still generate steps that are aligned with start and step, // that is should be snapped correctly diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 13c2089511..f3ddc5a6e1 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -15,9 +15,11 @@ import filodb.core.query.Filter.{Equals, EqualsRegex} import filodb.prometheus.ast.{TimeStepParams, WindowConstants} import filodb.prometheus.parse.Parser import filodb.prometheus.parse.Parser.Antlr -import filodb.query.{BadQueryException, IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} +import filodb.query.{IntervalSelector, LabelCardinality, PlanValidationSpec, RawSeries} import filodb.query.exec._ +import java.util.concurrent.TimeUnit + // scalastyle:off line.size.limit // scalastyle:off number.of.methods class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationSpec { @@ -2766,48 +2768,64 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } - it ("should fail to materialize unsupported split-partition queries with binary joins") { - val startSec = 123 - val stepSec = 456 - val endSec = 789 - val queries = Seq( - // aggregate - """sum(foo{job="app"} + bar{job="app2"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """sum(foo{job="app"} offset 1h + bar{job="app"})""", - """sum(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // instant - """sgn(foo{job="app"} + bar{job="app2"})""", - """exp(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """ln(foo{job="app"} offset 1h + bar{job="app"})""", - """log2(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // binary join - """foo{job="app"} + bar{job="app2"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app2"})""", - """foo{job="app"} offset 1h + bar{job="app"}""", - """foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h)""", - // scalar vector join - """123 + (foo{job="app"} + bar{job="app2"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """123 + (foo{job="app"} offset 1h + bar{job="app"})""", - """123 + (foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // scalar - """scalar(foo{job="app"} + bar{job="app2"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """scalar(foo{job="app"} offset 1h + bar{job="app"})""", - """scalar(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", - // absent - """absent(foo{job="app"} + bar{job="app2"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app2"}))""", - """absent(foo{job="app"} offset 1h + bar{job="app"})""", - """absent(foo{job="app"} + (bar{job="app"} + baz{job="app"} offset 1h))""", + + it ("should materialize split-partition queries with lookback and offset binary joins correctly"){ + val startSec = 0 + val stepSec = 3 + val endSec = 9999 + val splitSec = 5000 + + case class Test(query: String, expected: String = "") + val tests = Seq( + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5001,3,6803)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=5001000, step=3000, end=6803000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),6804,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),0,3,6800,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6801,3,7403)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=6801000, step=3000, end=7403000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1800000)) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 30m)),7404,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ), + Test(query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))""", + expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,3,6200,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1200000)) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6201,3,8003)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=6201000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 20m)) + sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),8004,3,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin ) ) + val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { - val midTime = (timeRange.startMs + timeRange.endMs) / 2 - List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, midTime)), - PartitionAssignment("remote1", "remote1-url", TimeRange(midTime, timeRange.endMs))) + val splitMs = 1000 * splitSec + List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs)), + PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs + 1, timeRange.endMs))) } override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], @@ -2816,15 +2834,19 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } val engine = new MultiPartitionPlanner( partitionLocationProvider, singlePartitionPlanner, "local", - MetricsTestData.timeseriesDataset, queryConfig - ) - for (query <- queries) { - val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec)) - val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec) - assertThrows[BadQueryException] { - engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, - plannerParams = PlannerParams(processMultiPartition = true))) - } + MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy( + supportRemoteRawExport = true, + maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS), + periodOfUncertaintyMs = 3000))) + + for (test <- tests) { + val lp = Parser.queryRangeToLogicalPlan(test.query, TimeStepParams(startSec, stepSec, endSec)) + val promQlQueryParams = PromQlQueryParams(test.query, startSec, stepSec, endSec) + val execPlan = engine.materialize(lp, + QueryContext(origQueryParams = promQlQueryParams, + plannerParams = PlannerParams(processMultiPartition = true)) + ) + validatePlan(execPlan, test.expected) } } diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index ea58539491..499c887f4d 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -641,6 +641,7 @@ enum InternalRangeFunction { ABSENT_OVER_TIME = 26; PRESENT_OVER_TIME = 27; MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28; + RATE_AND_MIN_MAX_OVER_TIME = 29; } enum SortFunctionId { diff --git a/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala index b00038cf07..03a35c745f 100644 --- a/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala @@ -34,7 +34,7 @@ import filodb.timeseries.TestTimeseriesProducer */ @State(Scope.Thread) class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { - org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.DEBUG) + org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.WARN) import filodb.coordinator._ import client.Client.{actorAsk, asyncAsk} @@ -46,7 +46,7 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { // TODO: move setup and ingestion to another trait val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf") - .withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("30MB"))) + .withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("300MB"))) private val cluster = FilodbCluster(system) cluster.join() @@ -90,11 +90,12 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { val (producingFut, containerStream) = TestTimeseriesProducer.metricsToContainerStream(startTime, numShards, numSeries, numMetricNames = 1, numSamplesPerTs * numSeries, dataset, shardMapper, spread, publishIntervalSec = 10, numBuckets = numBuckets, expHist = true) + val endTime = startTime + (numSamplesPerTs * 10000) val ingestTask = containerStream.groupBy(_._1) // Asynchronously subcribe and ingest each shard .mapParallelUnordered(numShards) { groupedStream => val shard = groupedStream.key - println(s"Starting ingest exp histograms on shard $shard...") + println(s"Starting ingest exp histograms on shard $shard from timestamp $startTime to $endTime") val shardStream = groupedStream.zipWithIndex.flatMap { case ((_, bytes), idx) => val data = bytes.map { array => SomeData(RecordContainer(array), idx) } Observable.fromIterable(data) @@ -117,13 +118,15 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { val histQuantileQuery = """histogram_quantile(0.7, sum(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m])))""" val queries = Seq(histQuantileQuery) - val queryTime = startTime + (7 * 60 * 1000) // 5 minutes from start until 60 minutes from start + val queryStartTime = startTime/1000 + (5 * 60) // 5 minutes from start until 60 minutes from start val queryStep = 120 // # of seconds between each query sample "step" - val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) + val queryEndTime = queryStartTime + queryIntervalMin*60 + val qParams = TimeStepParams(queryStartTime, queryStep, queryEndTime) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000)) } + println(s"Querying data from $queryStartTime to $queryEndTime") var queriesSucceeded = 0 var queriesFailed = 0 diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 12c74d2e72..d1ba20463e 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -62,28 +62,36 @@ trait Histogram extends Ordered[Histogram] { /** * Calculates histogram quantile based on bucket values using Prometheus scheme (increasing/LE) */ - def quantile(q: Double): Double = { + def quantile(q: Double, + min: Double = 0, // negative observations not supported yet + max: Double = Double.PositiveInfinity): Double = { val result = if (q < 0) Double.NegativeInfinity else if (q > 1) Double.PositiveInfinity - else if (numBuckets < 2) Double.NaN + else if (numBuckets < 2 || topBucketValue <= 0) Double.NaN else { // find rank for the quantile using total number of occurrences (which is the last bucket value) var rank = q * topBucketValue // using rank, find the le bucket which would have the identified rank - val b = firstBucketGTE(rank) + val bucket = firstBucketGTE(rank) + + // current bucket lower and upper bound; negative observations not supported yet - to be done later + var bucketStart = if (bucket == 0) 0 else bucketTop(bucket-1) + var bucketEnd = bucketTop(bucket) + // if min and max are in this bucket, adjust the bucket start and end + if (min > bucketStart && min <= bucketEnd) bucketStart = min + if (max > bucketStart && max <= bucketEnd) bucketEnd = max // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. - if (b == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) return bucketTop(numBuckets-2) - else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0) - else { - // interpolate quantile within le bucket - var (bucketStart, bucketEnd, count) = (0d, bucketTop(b), bucketValue(b)) - if (b > 0) { - bucketStart = bucketTop(b-1) - count -= bucketValue(b-1) - rank -= bucketValue(b-1) - } + if (bucket == numBuckets-1 && bucketEnd.isPosInfinity) { + return bucketTop(numBuckets-2) + } else if (bucket == 0 && bucketTop(0) <= 0) { + return bucketTop(0) // zero or negative bucket + } else { + + // interpolate quantile within boundaries of "bucket" + val count = if (bucket == 0) bucketValue(bucket) else bucketValue(bucket) - bucketValue(bucket-1) + rank -= (if (bucket == 0) 0 else bucketValue(bucket-1)) val fraction = rank/count if (!hasExponentialBuckets || bucketStart == 0) { bucketStart + (bucketEnd-bucketStart) * fraction @@ -133,9 +141,9 @@ trait Histogram extends Ordered[Histogram] { val b = it.next() val zeroBucket = (b == 0) val bucketUpper = bucketTop(b) - val bucketLower = if (b == 0) 0.0 else bucketTop(b - 1) + val bucketLower = if (zeroBucket) 0.0 else bucketTop(b - 1) val bucketVal = bucketValue(b) - val prevBucketVal = if (b == 0) 0.0 else bucketValue(b - 1) + val prevBucketVal = if (zeroBucket) 0.0 else bucketValue(b - 1) // Define interpolation functions def interpolateLinearly(v: Double): Double = { @@ -264,7 +272,6 @@ final case class LongHistogram(var buckets: HistogramBuckets, var values: Array[ } // TODO if otel histogram, the need to add values in a different way // see if we can refactor since MutableHistogram also has this logic - assert(other.buckets == buckets) cforRange { 0 until numBuckets } { b => values(b) += other.values(b) } @@ -433,54 +440,6 @@ object MutableHistogram { } } -/** - * MaxMinHistogram improves quantile calculation accuracy with a known max value recorded from the client. - * Whereas normally Prom histograms have +Inf as the highest bucket, and we cannot interpolate above the last - * non-Inf bucket, having a max allows us to interpolate from the rank up to the max. - * When the max value is lower, we interpolate between the bottom of the bucket and the max value. - * Both changes mean that the 0.90+ quantiles return much closer to the max value, instead of interpolating or clipping. - * The quantile result can never be above max, regardless of the bucket scheme. - * - * UPDATE: Adding support for min value as well to make sure the quantile is never below the minimum specified value. - * By default, the minimum value is set to 0.0 - */ -final case class MaxMinHistogram(innerHist: HistogramWithBuckets, max: Double, min: Double = 0.0) - extends HistogramWithBuckets { - final def buckets: HistogramBuckets = innerHist.buckets - final def bucketValue(no: Int): Double = innerHist.bucketValue(no) - - def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ??? - - override def quantile(q: Double): Double = { - val result = if (q < 0) Double.NegativeInfinity - else if (q > 1) Double.PositiveInfinity - else if (numBuckets < 2) Double.NaN - else { - // find rank for the quantile using total number of occurrences (which is the last bucket value) - var rank = q * topBucketValue - // using rank, find the le bucket which would have the identified rank - val bucketNum = firstBucketGTE(rank) - - // now calculate quantile. No need to special case top bucket since we will always cap top at max - if (bucketNum == 0 && bucketTop(0) <= 0) return bucketTop(0) - else { - // interpolate quantile within le bucket - var (bucketStart, bucketEnd, count) = (Math.max(0d, min), - Math.min(bucketTop(bucketNum), max), bucketValue(bucketNum)) - if (bucketNum > 0) { - bucketStart = bucketTop(bucketNum-1) - count -= bucketValue(bucketNum-1) - rank -= bucketValue(bucketNum-1) - } - bucketStart + (bucketEnd-bucketStart)*(rank/count) - } - } - result - } - - -} - /** * A scheme for buckets in a histogram. Since these are Prometheus-style histograms, * each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop @@ -635,7 +594,8 @@ final case class GeometricBuckets(firstBucket: Double, object Base2ExpHistogramBuckets { // TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here - val maxBuckets = 200 + // see PR for benchmark test results based on which maxBuckets was fixed. Dont increase without analysis. + val maxBuckets = 180 val maxAbsScale = 100 val maxAbsBucketIndex = 500 } @@ -746,11 +706,12 @@ final case class Base2ExpHistogramBuckets(scale: Int, val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop) var newScale = Math.min(scale, o.scale) var newBase = Math.max(base, o.base) - var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 - var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 + // minus one below since there is "+1" in `bucket(index) = base ^ (index + 1)` + var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 // exclusive + var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 // inclusive // Even if the two schemes are of same scale, they can have non-overlapping bucket ranges. // The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits. - while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) { + while (newBucketIndexEnd - newBucketIndexStart + 1 > maxBuckets) { newScale -= 1 newBase = Math.pow(2, Math.pow(2, -newScale)) newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 @@ -761,7 +722,7 @@ final case class Base2ExpHistogramBuckets(scale: Int, } /** - * Converts an OTel exponential index to array index. + * Converts an OTel exponential index to array index (aka bucket no). * For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then * -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10. * Know that 0 array index is reserved for zero bucket. diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index d781e22b02..13af94706b 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -11,6 +11,14 @@ object HistogramTest { Array[Double](11, 16, 26, 27, 33, 42, 46, 55), Array[Double](4, 4, 5, 33, 35, 67, 91, 121) ) + + val rawHistBuckets2 = Seq( + Array[Double](0, 15, 17, 20, 25, 34, 76, 82), + Array[Double](0, 16, 26, 26, 36, 38, 56, 59), + Array[Double](0, 16, 26, 27, 33, 42, 46, 55), + Array[Double](0, 4, 5, 33, 35, 67, 91, 121) + ) + val rawLongBuckets = rawHistBuckets.map(_.map(_.toLong)) val mutableHistograms = rawHistBuckets.map { buckets => // Create a new copy here, so that mutations don't affect original array @@ -26,7 +34,7 @@ object HistogramTest { } val otelExpBuckets = Base2ExpHistogramBuckets(3, -3, 7) - val otelExpHistograms = rawHistBuckets.map { buckets => + val otelExpHistograms = rawHistBuckets2.map { buckets => LongHistogram(otelExpBuckets, buckets.take(otelExpBuckets.numBuckets).map(_.toLong)) } @@ -103,7 +111,6 @@ class HistogramTest extends NativeVectorTest { customHistograms(0).quantile(0.95) shouldEqual 10 } - it("should calculate quantile correctly for exponential bucket histograms") { val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) @@ -117,6 +124,51 @@ class HistogramTest extends NativeVectorTest { hist.quantile(0.085) shouldEqual 0.014142135623730961 +- 0.00001 } + it("quantile for exponential histogram with real data should match expected") { + val bucketScheme = Base2ExpHistogramBuckets(3, -78, 126) + val str = "0.0=0.0, 0.0012664448775888738=0.0, 0.0013810679320049727=0.0, 0.001506065259187439=0.0, " + + "0.0016423758110424079=0.0, 0.0017910235218841198=0.0, 0.001953124999999996=0.0, 0.002129897915361827=0.0, " + + "0.002322670146489685=0.0, 0.0025328897551777484=0.0, 0.002762135864009946=0.0, 0.0030121305183748786=0.0, " + + "0.0032847516220848166=0.0, 0.0035820470437682404=0.0, 0.003906249999999993=0.0, 0.004259795830723655=0.0, " + + "0.004645340292979371=0.0, 0.005065779510355498=0.0, 0.005524271728019893=0.0, 0.006024261036749759=0.0, " + + "0.006569503244169634=0.0, 0.007164094087536483=0.0, 0.007812499999999988=0.0, 0.008519591661447312=0.0, " + + "0.009290680585958744=0.0, 0.010131559020710997=0.0, 0.011048543456039788=0.0, 0.012048522073499521=0.0, " + + "0.013139006488339272=0.0, 0.014328188175072969=0.0, 0.01562499999999998=0.0, 0.017039183322894627=0.0, " + + "0.018581361171917492=0.0, 0.020263118041422=0.0, 0.022097086912079584=0.0, 0.024097044146999046=0.0, " + + "0.026278012976678547=0.0, 0.028656376350145944=0.0, 0.031249999999999965=0.0, 0.03407836664578927=0.0, " + + "0.037162722343835=0.0, 0.04052623608284401=0.0, 0.044194173824159175=0.0, 0.0481940882939981=0.0, " + + "0.05255602595335711=0.0, 0.0573127527002919=0.0, 0.062499999999999944=0.0, 0.06815673329157855=0.0, " + + "0.07432544468767001=0.0, 0.08105247216568803=0.0, 0.08838834764831838=0.0, 0.09638817658799623=0.0, " + + "0.10511205190671424=0.0, 0.11462550540058382=0.0, 0.12499999999999992=0.0, 0.13631346658315713=1.0, " + + "0.14865088937534005=1.0, 0.16210494433137612=1.0, 0.17677669529663678=1.0, 0.1927763531759925=1.0, " + + "0.21022410381342854=1.0, 0.2292510108011677=1.0, 0.2499999999999999=2.0, 0.2726269331663143=2.0, " + + "0.29730177875068015=3.0, 0.3242098886627523=3.0, 0.3535533905932736=3.0, 0.3855527063519851=3.0, " + + "0.42044820762685714=4.0, 0.4585020216023355=5.0, 0.4999999999999999=5.0, 0.5452538663326287=5.0, " + + "0.5946035575013604=6.0, 0.6484197773255047=6.0, 0.7071067811865475=8.0, 0.7711054127039704=8.0, " + + "0.8408964152537145=9.0, 0.9170040432046712=9.0, 1.0=11.0, 1.0905077326652577=12.0, 1.189207115002721=14.0, " + + "1.2968395546510099=15.0, 1.4142135623730951=17.0, 1.542210825407941=19.0, 1.6817928305074294=20.0, " + + "1.8340080864093429=22.0, 2.0000000000000004=23.0, 2.181015465330516=26.0, 2.378414230005443=28.0, " + + "2.59367910930202=31.0, 2.8284271247461907=34.0, 3.084421650815883=37.0, 3.3635856610148593=41.0, " + + "3.6680161728186866=45.0, 4.000000000000002=48.0, 4.3620309306610325=53.0, 4.756828460010887=58.0, " + + "5.187358218604041=64.0, 5.656854249492383=70.0, 6.168843301631767=76.0, 6.7271713220297205=84.0, " + + "7.336032345637374=90.0, 8.000000000000005=99.0, 8.724061861322067=108.0, 9.513656920021775=118.0, " + + "10.374716437208086=129.0, 11.31370849898477=140.0, 12.337686603263537=152.0, 13.454342644059444=167.0, " + + "14.672064691274752=182.0, 16.000000000000014=199.0, 17.448123722644137=217.0, 19.027313840043554=237.0, " + + "20.749432874416176=258.0, 22.627416997969544=282.0, 24.675373206527077=308.0, 26.908685288118896=336.0, " + + "29.34412938254951=367.0, 32.000000000000036=400.0, 34.89624744528828=435.0, 38.05462768008712=474.0, " + + "41.49886574883236=517.0, 45.254833995939094=565.0, 49.35074641305417=617.0, 53.8173705762378=672.0, " + + "58.688258765099036=732.0, 64.00000000000009=749.0" + val counts = str.split(", ").map { s => + val kv = s.split("=") + kv(1).toDouble + } + val hist = MutableHistogram(bucketScheme, counts) + hist.quantile(0.5) shouldEqual 29.927691427444305 +- 0.00001 + hist.quantile(0.99) shouldEqual 61.602904581469566 +- 0.00001 + hist.quantile(0.01) shouldEqual 0.6916552392692796 +- 0.00001 + hist.quantile(0.99, min=0.03, max=59.87) shouldEqual 59.34643429268522 +- 0.00001 + } + it("should calculate histogram_fraction correctly for exponential histograms using exponential interpolation") { val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) @@ -169,34 +221,64 @@ class HistogramTest extends NativeVectorTest { } it("should calculate more accurate quantile with MaxMinHistogram using max column") { - val h = MaxMinHistogram(mutableHistograms(0), 90) - h.quantile(0.95) shouldEqual 72.2 +- 0.1 // more accurate due to max! + val h = mutableHistograms(0) + h.quantile(0.95, 0 ,90) shouldEqual 72.2 +- 0.1 // more accurate due to max! // Not just last bucket, but should always be clipped at max regardless of bucket scheme val values = Array[Double](10, 15, 17, 20, 25, 25, 25, 25) - val h2 = MaxMinHistogram(MutableHistogram(bucketScheme, values), 10) - h2.quantile(0.95) shouldEqual 9.5 +- 0.1 // more accurate due to max! + val h2 = MutableHistogram(bucketScheme, values) + h2.quantile(0.95, 0, 10) shouldEqual 9.5 +- 0.1 // more accurate due to max! val values3 = Array[Double](1, 1, 1, 1, 1, 4, 7, 7, 9, 9) ++ Array.fill(54)(12.0) - val h3 = MaxMinHistogram(MutableHistogram(HistogramBuckets.binaryBuckets64, values3), 1617.0) - h3.quantile(0.99) shouldEqual 1593.2 +- 0.1 - h3.quantile(0.90) shouldEqual 1379.4 +- 0.1 + val h3 = MutableHistogram(HistogramBuckets.binaryBuckets64, values3) + h3.quantile(0.99, 0, 1617.0) shouldEqual 1593.2 +- 0.1 + h3.quantile(0.90, 0, 1617.0) shouldEqual 1379.4 +- 0.1 } it("should calculate more accurate quantile with MaxMinHistogram using max and min column") { - val h = MaxMinHistogram(mutableHistograms(0), 100, 10) - h.quantile(0.95) shouldEqual 75.39 +- 0.1 // more accurate due to max, min! + val h = mutableHistograms(0) + h.quantile(0.95, 10, 100) shouldEqual 75.39 +- 0.1 // more accurate due to max, min! // Not just last bucket, but should always be clipped at max regardless of bucket scheme val values = Array[Double](10, 15, 17, 20, 25, 25, 25, 25) - val h2 = MaxMinHistogram(MutableHistogram(bucketScheme, values), 10, 2) - h2.quantile(0.95) shouldEqual 9.5 +- 0.1 // more accurate due to max, min! + val h2 = MutableHistogram(bucketScheme, values) + h2.quantile(0.95, 2, 10) shouldEqual 9.5 +- 0.1 // more accurate due to max, min! val values3 = Array[Double](1, 1, 1, 1, 1, 4, 7, 7, 9, 9) ++ Array.fill(54)(12.0) - val h3 = MaxMinHistogram(MutableHistogram(HistogramBuckets.binaryBuckets64, values3), 1617.0, 1.0) - h3.quantile(0.99) shouldEqual 1593.2 +- 0.1 - h3.quantile(0.90) shouldEqual 1379.4 +- 0.1 - h3.quantile(0.01) shouldEqual 1.0 +- 0.1 // must use the starting reference from min + val h3 = MutableHistogram(HistogramBuckets.binaryBuckets64, values3) + h3.quantile(0.99) shouldEqual 2006.0399999999995 +- 0.0001 // without min/max + h3.quantile(0.99, 0, 0) shouldEqual 2006.0399999999995 +- 0.0001 // with potentially wrong min max + h3.quantile(0.99, 1.0, 1617.0) shouldEqual 1593.2 +- 0.1 + h3.quantile(0.90, 1.0, 1617.0) shouldEqual 1379.4 +- 0.1 + h3.quantile(0.01, 1.0, 1617.0) shouldEqual 1.0 +- 0.1 // must use the starting reference from min + } + + it("should calculate more accurate quantile for otel exponential histogram using max and min column") { + // bucket boundaries for scale 3, range -3 to 3, 7 buckets + // zeroBucket: 0.0 -3: 0.840896, -2: 0.917004, -1: 1.000000, 0; 1.090508, 1: 1.189207, 2: 1.296840, 3: 1.414214 + val expected95thWithoutMinMax = Seq(1.3329136609345256, 1.2987136172035367, 1.3772644080792311, 1.3897175222720994) + otelExpHistograms.map { h => + h.quantile(0.95) + }.zip(expected95thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the max now, unlike before + val expected95thWithMinMax = Seq(1.2978395301558558, 1.296892165727062, 1.2990334920692943, 1.2993620241156902) + otelExpHistograms.map { h => + h.quantile(0.95, 0.78, 1.3) // notice 1.3 max is less than last bucket + }.zip(expected95thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + val expected5thWithoutMinMax = Seq(0.22984502016934866, 0.15504027656240363, 0.14452907137173218, 0.9199883517494387) + otelExpHistograms.map { h => + h.quantile(0.05) + }.zip(expected5thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the min now, unlike before + val expected5thWithMinMax = Seq(0.7961930120386448, 0.7908863115573832, 0.7901434789531481, 0.9199883517494387) + otelExpHistograms.map { h => + // notice 0.78 min is less than first non-zero bucketTop, + // but bigger than previous otel bucketTop if it existed + h.quantile(0.05, 0.78, 1.3) + }.zip(expected5thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } } it("should serialize to and from BinaryHistograms and compare correctly") { diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala index 58ab343f25..e194a025aa 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala @@ -87,6 +87,63 @@ class HistogramVectorTest extends NativeVectorTest { } } + // This test confirms that we can store 1-minutely exponential histogram samples of 160 buckets + // easily in a histogram appender / write buffer, and we can still hold samples for 1-hr flush interval + it("should be able to store sufficient hist samples in one 15k byte vector, the histogram appender default size") { + val str = "0.0=0.0, 0.0012664448775888738=0.0, 0.0013810679320049727=0.0, 0.001506065259187439=0.0, " + + "0.0016423758110424079=0.0, 0.0017910235218841198=0.0, 0.001953124999999996=0.0, 0.002129897915361827=0.0, " + + "0.002322670146489685=0.0, 0.0025328897551777484=0.0, 0.002762135864009946=0.0, 0.0030121305183748786=0.0, " + + "0.0032847516220848166=0.0, 0.0035820470437682404=0.0, 0.003906249999999993=0.0, 0.004259795830723655=0.0, " + + "0.004645340292979371=0.0, 0.005065779510355498=0.0, 0.005524271728019893=0.0, 0.006024261036749759=0.0, " + + "0.006569503244169634=0.0, 0.007164094087536483=0.0, 0.007812499999999988=0.0, 0.008519591661447312=0.0, " + + "0.009290680585958744=0.0, 0.010131559020710997=0.0, 0.011048543456039788=0.0, 0.012048522073499521=0.0, " + + "0.013139006488339272=0.0, 0.014328188175072969=0.0, 0.01562499999999998=0.0, 0.017039183322894627=0.0, " + + "0.018581361171917492=0.0, 0.020263118041422=0.0, 0.022097086912079584=0.0, 0.024097044146999046=0.0, " + + "0.026278012976678547=0.0, 0.028656376350145944=0.0, 0.031249999999999965=0.0, 0.03407836664578927=0.0, " + + "0.037162722343835=0.0, 0.04052623608284401=0.0, 0.044194173824159175=0.0, 0.0481940882939981=0.0, " + + "0.05255602595335711=0.0, 0.0573127527002919=0.0, 0.062499999999999944=0.0, 0.06815673329157855=0.0, " + + "0.07432544468767001=0.0, 0.08105247216568803=0.0, 0.08838834764831838=0.0, 0.09638817658799623=0.0, " + + "0.10511205190671424=0.0, 0.11462550540058382=0.0, 0.12499999999999992=0.0, 0.13631346658315713=1.0, " + + "0.14865088937534005=1.0, 0.16210494433137612=1.0, 0.17677669529663678=1.0, 0.1927763531759925=1.0, " + + "0.21022410381342854=1.0, 0.2292510108011677=1.0, 0.2499999999999999=2.0, 0.2726269331663143=2.0, " + + "0.29730177875068015=3.0, 0.3242098886627523=3.0, 0.3535533905932736=3.0, 0.3855527063519851=3.0, " + + "0.42044820762685714=4.0, 0.4585020216023355=5.0, 0.4999999999999999=5.0, 0.5452538663326287=5.0, " + + "0.5946035575013604=6.0, 0.6484197773255047=6.0, 0.7071067811865475=8.0, 0.7711054127039704=8.0, " + + "0.8408964152537145=9.0, 0.9170040432046712=9.0, 1.0=11.0, 1.0905077326652577=12.0, 1.189207115002721=14.0, " + + "1.2968395546510099=15.0, 1.4142135623730951=17.0, 1.542210825407941=19.0, 1.6817928305074294=20.0, " + + "1.8340080864093429=22.0, 2.0000000000000004=23.0, 2.181015465330516=26.0, 2.378414230005443=28.0, " + + "2.59367910930202=31.0, 2.8284271247461907=34.0, 3.084421650815883=37.0, 3.3635856610148593=41.0, " + + "3.6680161728186866=45.0, 4.000000000000002=48.0, 4.3620309306610325=53.0, 4.756828460010887=58.0, " + + "5.187358218604041=64.0, 5.656854249492383=70.0, 6.168843301631767=76.0, 6.7271713220297205=84.0, " + + "7.336032345637374=90.0, 8.000000000000005=99.0, 8.724061861322067=108.0, 9.513656920021775=118.0, " + + "10.374716437208086=129.0, 11.31370849898477=140.0, 12.337686603263537=152.0, 13.454342644059444=167.0, " + + "14.672064691274752=182.0, 16.000000000000014=199.0, 17.448123722644137=217.0, 19.027313840043554=237.0, " + + "20.749432874416176=258.0, 22.627416997969544=282.0, 24.675373206527077=308.0, 26.908685288118896=336.0, " + + "29.34412938254951=367.0, 32.000000000000036=400.0, 34.89624744528828=435.0, 38.05462768008712=474.0, " + + "41.49886574883236=517.0, 45.254833995939094=565.0, 49.35074641305417=617.0, 53.8173705762378=672.0, " + + "58.688258765099036=732.0, 64.00000000000009=749.0" + + val appender = HistogramVector.appending(memFactory, 15000) // 15k bytes is default blob size + val bucketScheme = Base2ExpHistogramBuckets(3, -78, 126) + var counts = str.split(", ").map { s => + val kv = s.split("=") + kv(1).toDouble.toLong + } + + (0 until 206).foreach { buckets => + val hist = LongHistogram(bucketScheme, counts) + hist.serialize(Some(buffer)) + if (buckets < 205) appender.addData(buffer) shouldEqual Ack + // should fail for 206th histogram because it crosses the size of write buffer + if (buckets >= 205) appender.addData(buffer) shouldEqual VectorTooSmall(73,46) + counts = counts.map(_ + 10) + } + + val reader = appender.reader.asInstanceOf[RowHistogramReader] + reader.length shouldEqual 205 + + } + it("should accept MutableHistograms with otel exp scheme and query them back") { val appender = HistogramVector.appending(memFactory, 1024) val mutHistograms = otelExpHistograms.map(h => MutableHistogram(h.buckets, h.valueArray)) diff --git a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala index d622d60f1a..210c88bb30 100644 --- a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala @@ -4,6 +4,7 @@ import enumeratum.EnumEntry import filodb.query.RangeFunctionId +// scalastyle:off number.of.types // Used for internal representations of RangeFunctions sealed abstract class InternalRangeFunction(val onCumulCounter: Boolean = false) extends EnumEntry @@ -55,6 +56,7 @@ object InternalRangeFunction { // Used only for histogram schemas with max column case object SumAndMaxOverTime extends InternalRangeFunction + case object RateAndMinMaxOverTime extends InternalRangeFunction case object LastSampleHistMaxMin extends InternalRangeFunction case object Timestamp extends InternalRangeFunction diff --git a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala index 8e8f5b4412..155c337d55 100644 --- a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala +++ b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala @@ -128,10 +128,10 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext, // This code is responsible for putting exact IDs needed by any range functions. val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers) - val colIDs = isMaxMinColumnsEnabled(maxMinTenantFilter) match { - case true => addIDsForHistMaxMin(sch, colIDs1) - case _ => colIDs1 - } + val colIDs = if (sch.data.columns.exists(_.name == "min") && + sch.data.columns.exists(_.name == "max") && + isMaxMinColumnsEnabled(maxMinTenantFilter)) addIDsForHistMaxMin(sch, colIDs1) + else colIDs1 // Modify transformers as needed for histogram w/ max, downsample, other schemas val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index b434484aa6..ff39cbff21 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -366,6 +366,52 @@ class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[Transi } } +class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] { + private val hFunc = new RateOverDeltaChunkedFunctionH + private val maxFunc = new MaxOverTimeChunkedFunctionD + private val minFunc = new MinOverTimeChunkedFunctionD + + override final def reset(): Unit = { + hFunc.reset() + maxFunc.reset() + minFunc.reset() + } + + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistMaxMinRow): Unit = { + hFunc.apply(windowStart, windowEnd, sampleToEmit) + sampleToEmit.setDouble(2, maxFunc.max) + sampleToEmit.setDouble(3, minFunc.min) + } + final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = ??? // should not be invoked + + import BinaryVector.BinaryVectorPtr + + // scalastyle:off parameter.number + def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader, + valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, + startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { + // Do BinarySearch for start/end pos only once for all columns == WIN! + val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + + // At least one sample is present + if (startRowNum <= endRowNum) { + hFunc.addTimeChunks(valueVectorAcc, valueVector, valueReader, startRowNum, endRowNum) + + // Get valueVector/reader for max column + val maxVectAcc = info.vectorAccessor(maxColId) + val maxVectPtr = info.vectorAddress(maxColId) + maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum) + + // Get valueVector/reader for min column + val minVectAcc = info.vectorAccessor(minColId) + val minVectPtr = info.vectorAddress(minColId) + minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum) + } + } +} + + /** * Computes Average Over Time using sum and count columns. * Used in when calculating avg_over_time using downsampled data diff --git a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala index 76dfaeb0ad..5e9b852c08 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala @@ -5,7 +5,7 @@ import java.time.{Instant, LocalDateTime, YearMonth, ZoneId, ZoneOffset} import spire.syntax.cfor._ import filodb.core.query.ResultSchema -import filodb.memory.format.vectors.{Histogram, MaxMinHistogram, MutableHistogram} +import filodb.memory.format.vectors.Histogram import filodb.query.InstantFunctionId import filodb.query.InstantFunctionId._ @@ -114,7 +114,8 @@ object InstantFunction { if (sourceSchema.isHistMaxMin) HistogramQuantileWithMaxMinImpl() else HistogramQuantileImpl() case HistogramMaxQuantile => HistogramMaxQuantileImpl() case HistogramBucket => HistogramBucketImpl() - case HistogramFraction => HistogramFractionImpl() + case HistogramFraction => + if (sourceSchema.isHistMaxMin) HistogramFractionWithMaxMinImpl() else HistogramFractionImpl() case _ => throw new UnsupportedOperationException(s"$function not supported.") } } @@ -374,6 +375,19 @@ final case class HistogramFractionImpl() extends HistToDoubleIFunction { } } +/** + * Histogram quantile function for Histogram columns, where all buckets are together. This will take in consideration + * of min and max columns + */ +final case class HistogramFractionWithMaxMinImpl() extends HMaxMinToDoubleIFunction { + final def apply(value: Histogram, max: Double, min: Double, scalarParams: Seq[Double]): Double = { + require(scalarParams.length == 2, "Need two params for histogram fraction function") + val lower = scalarParams(0) + val upper = scalarParams(1) + value.histogramFraction(lower, upper, min, max) + } +} + /** * Histogram quantile function for Histogram columns, where all buckets are together. This will take in consideration * of min and max columns @@ -381,11 +395,7 @@ final case class HistogramFractionImpl() extends HistToDoubleIFunction { final case class HistogramQuantileWithMaxMinImpl() extends HMaxMinToDoubleIFunction { final def apply(value: Histogram, max: Double, min: Double, scalarParams: Seq[Double]): Double = { require(scalarParams.length == 1, "Quantile (between 0 and 1) required for histogram quantile") - val maxMinHist = value match { - case h: MutableHistogram => MaxMinHistogram(h, max, min) - case other: Histogram => MaxMinHistogram(MutableHistogram(other), max, min) - } - maxMinHist.quantile(scalarParams(0)) + value.quantile(scalarParams(0), min, max) } } @@ -398,11 +408,7 @@ final case class HistogramMaxQuantileImpl() extends HMaxMinToDoubleIFunction { */ final def apply(hist: Histogram, max: Double, min: Double = Double.NaN, scalarParams: Seq[Double]): Double = { require(scalarParams.length == 1, "Quantile (between 0 and 1) required for histogram quantile") - val maxHist = hist match { - case h: MutableHistogram => MaxMinHistogram(h, max, 0d) - case other: Histogram => MaxMinHistogram(MutableHistogram(other), max, 0d) - } - maxHist.quantile(scalarParams(0)) + hist.quantile(scalarParams(0), 0, max) } } diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala index aecf277106..e63ceb1210 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala @@ -372,6 +372,7 @@ object RangeFunction { f match { case None => Some(LastSampleHistMaxMin) case Some(SumOverTime) => Some(SumAndMaxOverTime) + case Some(Rate) => Some(RateAndMinMaxOverTime) case other => other } @@ -383,6 +384,8 @@ object RangeFunction { () => new LastSampleChunkedFunctionHMax(schema.colIDs(2), schema.colIDs(3)) case Some(SumAndMaxOverTime) => require(schema.columns(2).name == "max") () => new SumAndMaxOverTimeFuncHD(schema.colIDs(2)) + case Some(RateAndMinMaxOverTime) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min") + () => new RateAndMinMaxOverTimeFuncHD(schema.colIDs(2), schema.colIDs(3)) case Some(Last) => () => new LastSampleChunkedFunctionH case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH case Some(Rate) if schema.columns(1).isCumulative diff --git a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala index 0a71367863..d5479e09d9 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala @@ -17,6 +17,7 @@ import filodb.memory._ import filodb.memory.data.ChunkMap import filodb.memory.format.{TupleRowReader, vectors => bv} import filodb.memory.BinaryRegion.NativePointer +import filodb.memory.format.vectors.MutableHistogram import filodb.query.exec._ /** @@ -313,6 +314,39 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { } } + // Goal of this is to verify histogram rate functionality for diff time windows. + // See PeriodicSampleMapperSpec for verifying integration of histograms with min and max + it("should aggregate both max/min and hist for rate when min/max is in schema") { + val (data, rv) = MMD.histMaxMinRV(defaultStartTS, pubFreq, 150, 8) + (0 until numIterations).foreach { x => + val windowSize = rand.nextInt(50) + 10 + val step = rand.nextInt(50) + 5 + info(s"iteration $x windowSize=$windowSize step=$step") + + val row = new TransientHistMaxMinRow() + val chunkedIt = chunkedWindowItHist(data, rv, new RateAndMinMaxOverTimeFuncHD(5, 4), windowSize, step, row) + chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) => + val aggHist = aggRow.getHistogram(1) + val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram]) + .foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg } + aggHist.asInstanceOf[MutableHistogram].values // what we got + .zip(sumRawHist.values.map(_/(windowSize-1)/10)) // expected + .foreach { case (a, b) => + a shouldEqual b +- 0.0000001 + } + + val maxMax = rawDataWindow.map(_(5).asInstanceOf[Double]) + .foldLeft(0.0) { case (agg, m) => Math.max(agg, m) } + aggRow.getDouble(2) shouldEqual maxMax + + val minMin = rawDataWindow.map(_(4).asInstanceOf[Double]) + .foldLeft(Double.MaxValue) { case (agg, m) => Math.min(agg, m) } + aggRow.getDouble(3) shouldEqual minMin + + } + } + } + it("should correctly aggregate min_over_time / max_over_time using both chunked and sliding iterators") { val data = (1 to 240).map(_.toDouble) val chunkSize = 40 diff --git a/run_benchmarks.sh b/run_benchmarks.sh index 458898ca25..2432976a5a 100755 --- a/run_benchmarks.sh +++ b/run_benchmarks.sh @@ -1,6 +1,11 @@ #!/bin/bash -sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 \ - -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 -jvmArgsAppend -Dkamon.enabled=false \ + +sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 2 -wi 2 -f 1 \ + -jvmArgsAppend -Dlogback.configurationFile=../conf/logback-perf.xml + -jvmArgsAppend -XX:MaxInlineLevel=20 \ + -jvmArgsAppend -Xmx4g \ + -jvmArgsAppend -XX:MaxInlineSize=99 \ + -jvmArgsAppend -Dkamon.enabled=false \ filodb.jmh.Base2ExponentialHistogramQueryBenchmark \ filodb.jmh.QueryHiCardInMemoryBenchmark \ filodb.jmh.QueryInMemoryBenchmark \ @@ -11,4 +16,5 @@ sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -X filodb.jmh.PartKeyLuceneIndexBenchmark \ filodb.jmh.PartKeyTantivyIndexBenchmark" -# -prof 'async:libPath=/path/to/async-profiler-3.0-macos/lib/libasyncProfiler.dylib;event=cpu;output=flamegraphdir=./profile-results' \ \ No newline at end of file +# Add below argument to enable profiling +# -prof \"async:libPath=/path/to/async-profiler-3.0-macos/lib/libasyncProfiler.dylib;event=cpu;output=flamegraph;dir=./profile-results\" \