Skip to content

Commit

Permalink
adjust @modifier with offset.
Browse files Browse the repository at this point in the history
friendly messages.
  • Loading branch information
Yu Zhang committed Nov 7, 2023
1 parent ff2624c commit 7574b0e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,19 @@ import filodb.query.exec._
val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset
val atModifierTimestamps = periodicSeriesPlan.atModifierTimestamps
val atModifierTimestampsWithOffset = periodicSeriesPlan.atModifierTimestampsWithOffset

if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
require(atModifierTimestamps.forall(at => at - lookbackMs >= earliestRawTime),
s"all @modifier $atModifierTimestamps should be no less than $earliestRawTime")
val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
// should be in raw cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime)
} else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
require(atModifierTimestamps.forall(at => at - lookbackMs < earliestRawTime),
s"all @modifier $atModifierTimestamps should be less than $earliestRawTime")
// should be in down sample cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime)
} else {
require(atModifierTimestamps.isEmpty, s"@modifier $atModifierTimestamps should be empty")
atModifierTimestampsWithOffset.isEmpty
}
require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported because it queries data" +
s" from both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier.")

if (maxOffset != minOffset
&& startWithOffsetMs - lookbackMs < earliestRawTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be empty") shouldEqual true
.contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true
}

it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") {
Expand All @@ -1905,7 +1905,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be empty") shouldEqual true
.contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true
}

it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") {
Expand All @@ -1916,7 +1916,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be empty") shouldEqual true
.contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true
}

it("should thrown IllegalArgumentException because while @modifier needs data from downsample cluster, the plan is dispatched to raw cluster") {
Expand All @@ -1926,7 +1926,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be no less than") shouldEqual true
.contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true
}

it("should thrown IllegalArgumentException because while @modifier needs data from raw cluster, the plan is dispatched to downsample cluster") {
Expand All @@ -1936,7 +1936,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be less than") shouldEqual true
.contains("both downsample and raw cluster. Please adjust the start and end time if you want to use @modifier") shouldEqual true
}

it("both modifier and query range require the data from downsample cluster.") {
Expand Down
44 changes: 22 additions & 22 deletions query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan {
def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan

// The array of all at modifier
def atModifierTimestamps: Seq[Long] = Seq()
def atModifierTimestampsWithOffset: Seq[Long] = Seq()
}

sealed trait MetadataQueryPlan extends LogicalPlan {
Expand Down Expand Up @@ -264,8 +264,8 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries =
rawSeries.replaceRawSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
override def atModifierTimestampsWithOffset: Seq[Long] = {
atMs.map(at => at - offsetMs.getOrElse(0L)).toSeq
}
}

Expand Down Expand Up @@ -330,7 +330,7 @@ case class SubqueryWithWindowing(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs)
}

override def atModifierTimestamps: Seq[Long] = {
override def atModifierTimestampsWithOffset: Seq[Long] = {
atMs.toSeq
}
}
Expand Down Expand Up @@ -372,8 +372,8 @@ case class TopLevelSubquery(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries)
}

override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
override def atModifierTimestampsWithOffset: Seq[Long] = {
atMs.map(at => at - originalOffsetMs.getOrElse(0L)).toSeq
}
}

Expand Down Expand Up @@ -405,8 +405,8 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan,
series = series.replaceRawSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
override def atModifierTimestampsWithOffset: Seq[Long] = {
atMs.map(at => at - offsetMs.getOrElse(0L)).toSeq
}
}

Expand Down Expand Up @@ -456,8 +456,8 @@ case class Aggregate(operator: AggregationOperator,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}

}
Expand Down Expand Up @@ -490,8 +490,8 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan,
override def isRoutable: Boolean = lhs.isRoutable || rhs.isRoutable
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs =
lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters))
override def atModifierTimestamps: Seq[Long] = {
lhs.atModifierTimestamps ++ rhs.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
lhs.atModifierTimestampsWithOffset ++ rhs.atModifierTimestampsWithOffset
}
}

Expand Down Expand Up @@ -529,8 +529,8 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan,
vectors = vectors.replacePeriodicSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}
}

Expand Down Expand Up @@ -563,8 +563,8 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}
}

Expand All @@ -580,8 +580,8 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}
}

Expand Down Expand Up @@ -691,8 +691,8 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}
}

Expand All @@ -711,8 +711,8 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
override def atModifierTimestampsWithOffset: Seq[Long] = {
vectors.atModifierTimestampsWithOffset
}
}

Expand Down

0 comments on commit 7574b0e

Please sign in to comment.