diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala index 68a8444d0e..86e43d22c8 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala @@ -53,7 +53,7 @@ import filodb.query.exec._ val earliestRawTime = earliestRawTimestampFn val offsetMillis = LogicalPlanUtils.getOffsetMillis(periodicSeriesPlan) val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min) - require(periodicSeriesPlan.validateAtModifier(earliestRawTime), + require(!periodicSeriesPlan.hasAtModifier || periodicSeriesPlan.getDataSource(earliestRawTime) != 2, s"$periodicSeriesPlan @modifier and query range should be all greater or less than $earliestRawTime") val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 5e7151e672..45686d3609 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -1887,7 +1887,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException due to the need of both raw and downsample cluster with @modifierx") { + it("should thrown IllegalArgumentException due to the need of both raw and downsample cluster with @modifier") { val lp = Parser.queryRangeToLogicalPlan( """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin, @@ -1898,6 +1898,57 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS .contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true) } + + it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") { + val lp = Parser.queryRangeToLogicalPlan( + """topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin, + TimeStepParams(now / 1000 - 8.days.toSeconds, step, now / 1000), Antlr) + val thrown = the[IllegalArgumentException] thrownBy + rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + thrown.toString + .contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true) + } + + it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") { + val lp = Parser.queryRangeToLogicalPlan( + """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND + | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] offset 8d @end()))""".stripMargin, + TimeStepParams(now / 1000 - 1.days.toSeconds, step, now / 1000), Antlr) + val thrown = the[IllegalArgumentException] thrownBy + rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + thrown.toString + .contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true) + } + + it("both modifier and query range require the data from downsample cluster.") { + val lp = Parser.queryRangeToLogicalPlan( + """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND + | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] offset 8d @end()))""".stripMargin, + TimeStepParams(now / 1000 - 9.days.toSeconds, step, now / 1000 - 8.days.toSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + println() + println(execPlan.printTree()) + println() + val expectedPlan = + """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample) + |-T~PeriodicSamplesMapper(start=1633999730000, step=300000, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633999670000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample) + |-T~PeriodicSamplesMapper(start=1633999730000, step=300000, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633999670000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample) + |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633999730,300,1634086130)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000)) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633394810000,1633394930000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000)) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633394810000,1633394930000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)""".stripMargin + validatePlan(execPlan, expectedPlan) + } + it("Vector plan on a binary join in RR and raw happen in memory") { val lp = Parser.queryRangeToLogicalPlan( """vector(scalar(foo:1m{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" } * diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index 6ea4b55984..53db13bc80 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -72,11 +72,22 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan { def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan /** - * validate the @modifier timestamp is in the same cluster as the query range. + * Get the data source. * @param earliestRawTime the earliest timestamp of the raw cluster. - * @return + * @return 0 if all from raw cluster, 1 if all from downsample cluster, 2 if from raw and downsample. */ - def validateAtModifier(earliestRawTime: Long): Boolean = true + def getDataSource(earliestRawTime: Long): Int = { + if (startMs >= earliestRawTime) { + 0 + } else if (endMs < earliestRawTime) { + 1 + } else { + 2 + } + } + + def hasAtModifier: Boolean = false + } sealed trait MetadataQueryPlan extends LogicalPlan { @@ -268,11 +279,19 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries = rawSeries.replaceRawSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime) + override def hasAtModifier: Boolean = { // the @modifier timestamp and range should be all in raw or downsample cluster. - atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset) - || (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset)) + atMs.nonEmpty + } + + override def getDataSource(earliestRawTime: Long): Int = { + if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) { + // query range spans both raw and downsample cluster. + return 2 + } + atMs.map(_ - offsetMs.getOrElse(0L)) + .map(at => if (at >= earliestRawTime) 0 else 1) + .getOrElse(super.getDataSource(earliestRawTime)) } } @@ -337,11 +356,18 @@ case class SubqueryWithWindowing( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs) } - override def validateAtModifier(earliestRawTime: Long): Boolean = { - val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime) - // the @modifier timestamp and range should be all in raw or downsample cluster. - atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset) - || (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset)) + override def hasAtModifier: Boolean = { + atMs.nonEmpty + } + + override def getDataSource(earliestRawTime: Long): Int = { + if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) { + // query range spans both raw and downsample cluster. + return 2 + } + atMs.map(_ - offsetMs.getOrElse(0L)) + .map(at => if (at >= earliestRawTime) 0 else 1) + .getOrElse(super.getDataSource(earliestRawTime)) } } @@ -382,11 +408,14 @@ case class TopLevelSubquery( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries) } - override def validateAtModifier(earliestRawTime: Long): Boolean = { - val earliestRawTimeWithOffset = originalOffsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime) - // the @modifier timestamp and range should be all in raw or downsample cluster. - atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset) - || (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset)) + override def hasAtModifier: Boolean = { + atMs.nonEmpty + } + + override def getDataSource(earliestRawTime: Long): Int = { + atMs.map(_ - originalOffsetMs.getOrElse(0L)) + .map(at => if (at >= startMs) 0 else 1) + .getOrElse(super.getDataSource(earliestRawTime)) } } @@ -418,11 +447,18 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan, series = series.replaceRawSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime) - // the @modifier timestamp and range should be all in raw or downsample cluster. - atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs>= earliestRawTimeWithOffset) - || (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset)) + override def hasAtModifier: Boolean = { + atMs.nonEmpty + } + + override def getDataSource(earliestRawTime: Long): Int = { + if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) { + // query range spans both raw and downsample cluster. + return 2 + } + atMs.map(_ - offsetMs.getOrElse(0L)) + .map(at => if (at >= earliestRawTime) 0 else 1) + .getOrElse(super.getDataSource(earliestRawTime)) } } @@ -472,10 +508,13 @@ case class Aggregate(operator: AggregationOperator, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier } + override def getDataSource(earliestRawTime: Long): Int = { + vectors.getDataSource(earliestRawTime) + } } /** @@ -507,8 +546,14 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs = lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - lhs.validateAtModifier(earliestRawTime) && rhs.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + lhs.hasAtModifier || rhs.hasAtModifier + } + + override def getDataSource(earliestRawTime: Long): Int = { + val left = lhs.getDataSource(earliestRawTime) + val right = rhs.getDataSource(earliestRawTime) + if (left == right) left else 2 } } @@ -546,8 +591,8 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan, vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier } } @@ -580,8 +625,12 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } + + override def getDataSource(earliestRawTime: Long): Int = { + vectors.getDataSource(earliestRawTime) } } @@ -597,8 +646,12 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } + + override def getDataSource(earliestRawTime: Long): Int = { + vectors.getDataSource(earliestRawTime) } } @@ -708,8 +761,12 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } + + override def getDataSource(earliestRawTime: Long): Int = { + vectors.getDataSource(earliestRawTime) } } @@ -728,8 +785,12 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def validateAtModifier(earliestRawTime: Long): Boolean = { - vectors.validateAtModifier(earliestRawTime) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } + + override def getDataSource(earliestRawTime: Long): Int = { + vectors.getDataSource(earliestRawTime) } }