diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala index f2ef44aa52..4f7084e6f9 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala @@ -59,17 +59,19 @@ import filodb.query.exec._ val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset // For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5 val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset - val atModifierTimestamps = periodicSeriesPlan.atModifierTimestamps + val atModifierTimestampsWithOffset = periodicSeriesPlan.atModifierTimestampsWithOffset - if (startWithOffsetMs - lookbackMs >= earliestRawTime) { - require(atModifierTimestamps.forall(at => at - lookbackMs >= earliestRawTime), - s"all @modifier $atModifierTimestamps should be no less than $earliestRawTime") + val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) { + // should be in raw cluster. + atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime) } else if (endWithOffsetMs - lookbackMs < earliestRawTime) { - require(atModifierTimestamps.forall(at => at - lookbackMs < earliestRawTime), - s"all @modifier $atModifierTimestamps should be less than $earliestRawTime") + // should be in down sample cluster. + atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime) } else { - require(atModifierTimestamps.isEmpty, s"@modifier $atModifierTimestamps should be empty") + atModifierTimestampsWithOffset.isEmpty } + require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported because it queries data" + + s" from both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier.") if (maxOffset != minOffset && startWithOffsetMs - lookbackMs < earliestRawTime diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index a7d4431db1..3c38005a58 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -1895,7 +1895,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS val thrown = the[IllegalArgumentException] thrownBy rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) thrown.toString - .contains("should be empty") shouldEqual true + .contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true } it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") { @@ -1905,7 +1905,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS val thrown = the[IllegalArgumentException] thrownBy rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) thrown.toString - .contains("should be empty") shouldEqual true + .contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true } it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") { @@ -1916,7 +1916,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS val thrown = the[IllegalArgumentException] thrownBy rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) thrown.toString - .contains("should be empty") shouldEqual true + .contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true } it("should thrown IllegalArgumentException because while @modifier needs data from downsample cluster, the plan is dispatched to raw cluster") { @@ -1926,7 +1926,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS val thrown = the[IllegalArgumentException] thrownBy rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) thrown.toString - .contains("should be no less than") shouldEqual true + .contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true } it("should thrown IllegalArgumentException because while @modifier needs data from raw cluster, the plan is dispatched to downsample cluster") { @@ -1936,7 +1936,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS val thrown = the[IllegalArgumentException] thrownBy rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) thrown.toString - .contains("should be less than") shouldEqual true + .contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true } it("both modifier and query range require the data from downsample cluster.") { diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index cb5a39a0fc..46ae415808 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -72,7 +72,7 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan { def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan // The array of all at modifier - def atModifierTimestamps: Seq[Long] = Seq() + def atModifierTimestampsWithOffset: Seq[Long] = Seq() } sealed trait MetadataQueryPlan extends LogicalPlan { @@ -264,8 +264,8 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries = rawSeries.replaceRawSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - atMs.toSeq + override def atModifierTimestampsWithOffset: Seq[Long] = { + atMs.map(at => at - offsetMs.getOrElse(0L)).toSeq } } @@ -330,7 +330,7 @@ case class SubqueryWithWindowing( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs) } - override def atModifierTimestamps: Seq[Long] = { + override def atModifierTimestampsWithOffset: Seq[Long] = { atMs.toSeq } } @@ -372,8 +372,8 @@ case class TopLevelSubquery( this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries) } - override def atModifierTimestamps: Seq[Long] = { - atMs.toSeq + override def atModifierTimestampsWithOffset: Seq[Long] = { + atMs.map(at => at - originalOffsetMs.getOrElse(0L)).toSeq } } @@ -405,8 +405,8 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan, series = series.replaceRawSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def atModifierTimestamps: Seq[Long] = { - atMs.toSeq + override def atModifierTimestampsWithOffset: Seq[Long] = { + atMs.map(at => at - offsetMs.getOrElse(0L)).toSeq } } @@ -456,8 +456,8 @@ case class Aggregate(operator: AggregationOperator, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } } @@ -490,8 +490,8 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan, override def isRoutable: Boolean = lhs.isRoutable || rhs.isRoutable override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs = lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - lhs.atModifierTimestamps ++ rhs.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + lhs.atModifierTimestampsWithOffset ++ rhs.atModifierTimestampsWithOffset } } @@ -529,8 +529,8 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan, vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } } @@ -563,8 +563,8 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } } @@ -580,8 +580,8 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } } @@ -691,8 +691,8 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } } @@ -711,8 +711,8 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) - override def atModifierTimestamps: Seq[Long] = { - vectors.atModifierTimestamps + override def atModifierTimestampsWithOffset: Seq[Long] = { + vectors.atModifierTimestampsWithOffset } }