Skip to content

Commit

Permalink
fix sanity check.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Nov 7, 2023
1 parent 86d0e56 commit ff2624c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,22 @@ import filodb.query.exec._
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)

val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max
require(periodicSeriesPlan.validateAtModifier(earliestRawTime, lookbackMs),
s"Because $periodicSeriesPlan has @modifier." +
s" The timestamps query data should be all before or after $earliestRawTime")

val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset
val atModifierTimestamps = periodicSeriesPlan.atModifierTimestamps

if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
require(atModifierTimestamps.forall(at => at - lookbackMs >= earliestRawTime),
s"all @modifier $atModifierTimestamps should be no less than $earliestRawTime")
} else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
require(atModifierTimestamps.forall(at => at - lookbackMs < earliestRawTime),
s"all @modifier $atModifierTimestamps should be less than $earliestRawTime")
} else {
require(atModifierTimestamps.isEmpty, s"@modifier $atModifierTimestamps should be empty")
}

if (maxOffset != minOffset
&& startWithOffsetMs - lookbackMs < earliestRawTime
&& endWithOffsetMs >= earliestRawTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("The timestamps query data should be all before or after 1634172530000") shouldEqual (true)
.contains("should be empty") shouldEqual true
}

it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") {
Expand All @@ -1905,7 +1905,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("The timestamps query data should be all before or after 1634172530000") shouldEqual (true)
.contains("should be empty") shouldEqual true
}

it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") {
Expand All @@ -1916,7 +1916,27 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("The timestamps query data should be all before or after 1634172530000") shouldEqual (true)
.contains("should be empty") shouldEqual true
}

it("should thrown IllegalArgumentException because while @modifier needs data from downsample cluster, the plan is dispatched to raw cluster") {
val lp = Parser.queryRangeToLogicalPlan(
s"""topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @${now / 1000 - 8.days.toSeconds}))""".stripMargin,
TimeStepParams(now / 1000 - 1.days.toSeconds, step, now / 1000), Antlr)
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be no less than") shouldEqual true
}

it("should thrown IllegalArgumentException because while @modifier needs data from raw cluster, the plan is dispatched to downsample cluster") {
val lp = Parser.queryRangeToLogicalPlan(
s"""topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @${now / 1000}))""".stripMargin,
TimeStepParams(now / 1000 - 9.days.toSeconds, step, now / 1000 - 8.days.toSeconds), Antlr)
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("should be less than") shouldEqual true
}

it("both modifier and query range require the data from downsample cluster.") {
Expand Down
66 changes: 24 additions & 42 deletions query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan {

def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan

/**
* validate the @modifier timestamp is in the same cluster as the query range.
* @param earliestRawTime the earliest timestamp of the raw cluster.
* @param lookbackMs the look back time.
* @return
*/
def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = true
// The array of all at modifier
def atModifierTimestamps: Seq[Long] = Seq()
}

sealed trait MetadataQueryPlan extends LogicalPlan {
Expand Down Expand Up @@ -269,11 +264,8 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries =
rawSeries.replaceRawSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.map(at => at - lookbackMs - offsetMs.getOrElse(0L))
.forall(at => (at >= earliestRawTime && startMs - lookbackMs - offsetMs.getOrElse(0L) >= earliestRawTime)
|| (at < earliestRawTime && endMs < earliestRawTime))
override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
}
}

Expand Down Expand Up @@ -338,11 +330,8 @@ case class SubqueryWithWindowing(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs)
}

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.map(at => at - lookbackMs - offsetMs.getOrElse(0L))
.forall(at => (at >= earliestRawTime && startMs - lookbackMs - offsetMs.getOrElse(0L) >= earliestRawTime)
|| (at < earliestRawTime && endMs < earliestRawTime))
override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
}
}

Expand Down Expand Up @@ -383,11 +372,8 @@ case class TopLevelSubquery(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries)
}

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.map(at => at - lookbackMs - originalOffsetMs.getOrElse(0L))
.forall(at => (at >= earliestRawTime && startMs - lookbackMs - originalOffsetMs.getOrElse(0L) >= earliestRawTime)
|| (at < earliestRawTime && endMs < earliestRawTime))
override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
}
}

Expand Down Expand Up @@ -419,11 +405,8 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan,
series = series.replaceRawSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.map(at => at - lookbackMs - offsetMs.getOrElse(0L))
.forall(at => (at >= earliestRawTime && startMs - lookbackMs - offsetMs.getOrElse(0L) >= earliestRawTime)
|| (at < earliestRawTime && endMs < earliestRawTime))
override def atModifierTimestamps: Seq[Long] = {
atMs.toSeq
}
}

Expand Down Expand Up @@ -473,8 +456,8 @@ case class Aggregate(operator: AggregationOperator,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}

}
Expand Down Expand Up @@ -507,9 +490,8 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan,
override def isRoutable: Boolean = lhs.isRoutable || rhs.isRoutable
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs =
lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
lhs.validateAtModifier(earliestRawTime, lookbackMs) && rhs.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
lhs.atModifierTimestamps ++ rhs.atModifierTimestamps
}
}

Expand Down Expand Up @@ -547,8 +529,8 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan,
vectors = vectors.replacePeriodicSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}
}

Expand Down Expand Up @@ -581,8 +563,8 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}
}

Expand All @@ -598,8 +580,8 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}
}

Expand Down Expand Up @@ -709,8 +691,8 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}
}

Expand All @@ -729,8 +711,8 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long, lookbackMs: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime, lookbackMs)
override def atModifierTimestamps: Seq[Long] = {
vectors.atModifierTimestamps
}
}

Expand Down

0 comments on commit ff2624c

Please sign in to comment.