Skip to content

Commit

Permalink
handle offset and add more test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Nov 6, 2023
1 parent c5ee842 commit 68298f2
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import filodb.query.exec._
val earliestRawTime = earliestRawTimestampFn
val offsetMillis = LogicalPlanUtils.getOffsetMillis(periodicSeriesPlan)
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)
require(periodicSeriesPlan.validateAtModifier(earliestRawTime),
require(!periodicSeriesPlan.hasAtModifier || periodicSeriesPlan.getDataSource(earliestRawTime) != 2,
s"$periodicSeriesPlan @modifier and query range should be all greater or less than $earliestRawTime")

val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
validatePlan(execPlan, expectedPlan)
}

it("should thrown IllegalArgumentException due to the need of both raw and downsample cluster with @modifierx") {
it("should thrown IllegalArgumentException due to the need of both raw and downsample cluster with @modifier") {
val lp = Parser.queryRangeToLogicalPlan(
"""rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND
| topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin,
Expand All @@ -1898,6 +1898,57 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
.contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true)
}


it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") {
val lp = Parser.queryRangeToLogicalPlan(
"""topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin,
TimeStepParams(now / 1000 - 8.days.toSeconds, step, now / 1000), Antlr)
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true)
}

it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") {
val lp = Parser.queryRangeToLogicalPlan(
"""rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND
| topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] offset 8d @end()))""".stripMargin,
TimeStepParams(now / 1000 - 1.days.toSeconds, step, now / 1000), Antlr)
val thrown = the[IllegalArgumentException] thrownBy
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
thrown.toString
.contains("@modifier and query range should be all greater or less than 1634172530000") shouldEqual (true)
}

it("both modifier and query range require the data from downsample cluster.") {
val lp = Parser.queryRangeToLogicalPlan(
"""rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND
| topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] offset 8d @end()))""".stripMargin,
TimeStepParams(now / 1000 - 9.days.toSeconds, step, now / 1000 - 8.days.toSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
println()
println(execPlan.printTree())
println()
val expectedPlan =
"""E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)
|-T~PeriodicSamplesMapper(start=1633999730000, step=300000, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633999670000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)
|-T~PeriodicSamplesMapper(start=1633999730000, step=300000, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None)
|--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633999670000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)
|-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633999730,300,1634086130))
|--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)
|---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List())
|-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000))
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633394810000,1633394930000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)
|---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List())
|----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List())
|-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000))
|------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633394810000,1633394930000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-324361366],downsample)""".stripMargin
validatePlan(execPlan, expectedPlan)
}

it("Vector plan on a binary join in RR and raw happen in memory") {
val lp = Parser.queryRangeToLogicalPlan(
"""vector(scalar(foo:1m{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" } *
Expand Down
133 changes: 97 additions & 36 deletions query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,22 @@ sealed trait PeriodicSeriesPlan extends LogicalPlan {
def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan

/**
* validate the @modifier timestamp is in the same cluster as the query range.
* Get the data source.
* @param earliestRawTime the earliest timestamp of the raw cluster.
* @return
* @return 0 if all from raw cluster, 1 if all from downsample cluster, 2 if from raw and downsample.
*/
def validateAtModifier(earliestRawTime: Long): Boolean = true
def getDataSource(earliestRawTime: Long): Int = {
if (startMs >= earliestRawTime) {
0
} else if (endMs < earliestRawTime) {
1
} else {
2
}
}

def hasAtModifier: Boolean = false

}

sealed trait MetadataQueryPlan extends LogicalPlan {
Expand Down Expand Up @@ -268,11 +279,19 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries =
rawSeries.replaceRawSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime)
override def hasAtModifier: Boolean = {
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset)
|| (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset))
atMs.nonEmpty
}

override def getDataSource(earliestRawTime: Long): Int = {
if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) {
// query range spans both raw and downsample cluster.
return 2
}
atMs.map(_ - offsetMs.getOrElse(0L))
.map(at => if (at >= earliestRawTime) 0 else 1)
.getOrElse(super.getDataSource(earliestRawTime))
}
}

Expand Down Expand Up @@ -337,11 +356,18 @@ case class SubqueryWithWindowing(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs)
}

override def validateAtModifier(earliestRawTime: Long): Boolean = {
val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime)
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset)
|| (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset))
override def hasAtModifier: Boolean = {
atMs.nonEmpty
}

override def getDataSource(earliestRawTime: Long): Int = {
if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) {
// query range spans both raw and downsample cluster.
return 2
}
atMs.map(_ - offsetMs.getOrElse(0L))
.map(at => if (at >= earliestRawTime) 0 else 1)
.getOrElse(super.getDataSource(earliestRawTime))
}
}

Expand Down Expand Up @@ -382,11 +408,14 @@ case class TopLevelSubquery(
this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries)
}

override def validateAtModifier(earliestRawTime: Long): Boolean = {
val earliestRawTimeWithOffset = originalOffsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime)
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs >= earliestRawTimeWithOffset)
|| (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset))
override def hasAtModifier: Boolean = {
atMs.nonEmpty
}

override def getDataSource(earliestRawTime: Long): Int = {
atMs.map(_ - originalOffsetMs.getOrElse(0L))
.map(at => if (at >= startMs) 0 else 1)
.getOrElse(super.getDataSource(earliestRawTime))
}
}

Expand Down Expand Up @@ -418,11 +447,18 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan,
series = series.replaceRawSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
val earliestRawTimeWithOffset = offsetMs.map(offset => offset + earliestRawTime).getOrElse(earliestRawTime)
// the @modifier timestamp and range should be all in raw or downsample cluster.
atMs.forall(at => (at >= earliestRawTimeWithOffset && startMs>= earliestRawTimeWithOffset)
|| (at < earliestRawTimeWithOffset && endMs < earliestRawTimeWithOffset))
override def hasAtModifier: Boolean = {
atMs.nonEmpty
}

override def getDataSource(earliestRawTime: Long): Int = {
if (startMs - offsetMs.getOrElse(0L) < earliestRawTime && endMs - offsetMs.getOrElse(0L) >= earliestRawTime) {
// query range spans both raw and downsample cluster.
return 2
}
atMs.map(_ - offsetMs.getOrElse(0L))
.map(at => if (at >= earliestRawTime) 0 else 1)
.getOrElse(super.getDataSource(earliestRawTime))
}
}

Expand Down Expand Up @@ -472,10 +508,13 @@ case class Aggregate(operator: AggregationOperator,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
vectors.getDataSource(earliestRawTime)
}
}

/**
Expand Down Expand Up @@ -507,8 +546,14 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs =
lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
lhs.validateAtModifier(earliestRawTime) && rhs.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
lhs.hasAtModifier || rhs.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
val left = lhs.getDataSource(earliestRawTime)
val right = rhs.getDataSource(earliestRawTime)
if (left == right) left else 2
}
}

Expand Down Expand Up @@ -546,8 +591,8 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan,
vectors = vectors.replacePeriodicSeriesFilters(filters),
functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}
}

Expand Down Expand Up @@ -580,8 +625,12 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
vectors.getDataSource(earliestRawTime)
}
}

Expand All @@ -597,8 +646,12 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan,
override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors =
vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
vectors.getDataSource(earliestRawTime)
}
}

Expand Down Expand Up @@ -708,8 +761,12 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
vectors.getDataSource(earliestRawTime)
}
}

Expand All @@ -728,8 +785,12 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan,
this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters),
vectors = vectors.replacePeriodicSeriesFilters(filters))

override def validateAtModifier(earliestRawTime: Long): Boolean = {
vectors.validateAtModifier(earliestRawTime)
override def hasAtModifier: Boolean = {
vectors.hasAtModifier
}

override def getDataSource(earliestRawTime: Long): Int = {
vectors.getDataSource(earliestRawTime)
}
}

Expand Down

0 comments on commit 68298f2

Please sign in to comment.