diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index b9be8d5906..28c74665f4 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -133,13 +133,17 @@ trait DefaultPlanner { val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function) + val realScanStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs) + val realScanEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs) + val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) + val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext) val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window) - series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(logicalPlanWithoutBucket.startMs, - logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn), qContext, + series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, + realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext, logicalPlanWithoutBucket.stepMultipleNotationUsed, paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource = rawSource))) - if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) { + val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) { val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil, AggregateClause.byOpt(Seq("job"))) // Add sum to aggregate all child responses @@ -147,9 +151,14 @@ trait DefaultPlanner { val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams = qContext.plannerParams.copy(skipAggregatePresent = true)), series))) addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters, - RangeParams(logicalPlanWithoutBucket.startMs / 1000, logicalPlanWithoutBucket.stepMs / 1000, - logicalPlanWithoutBucket.endMs / 1000), qContext) + RangeParams(realScanStartMs, realScanStepMs, realScanEndMs), qContext) } else series + // repeat the same value for each step if '@' is specified + if (lp.atMs.nonEmpty) { + result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs, + p.queryWithPlanName(qContext)))) + } + result } private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]): @@ -202,25 +211,53 @@ trait DefaultPlanner { } else (None, None, lp) + val realScanStartMs = lp.atMs.getOrElse(lp.startMs) + val realScanEndMs = lp.atMs.getOrElse(lp.endMs) + val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs) + val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess) val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match { case r: RawSeries => !r.supportsRemoteDataCall case _ => true }) - rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs, - window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil, + rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs, + realScanEndMs, window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil, lp.offsetMs, rawSource = rawSource))) if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) { - val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(lp.startMs / 1000, lp.stepMs / 1000, - lp.endMs / 1000)) + val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs / 1000, realScanStepMs / 1000, + realScanEndMs / 1000)) rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket, Seq(paramsExec)))) } + // repeat the same value for each step if '@' is specified + if (lp.atMs.nonEmpty) { + rawSeries.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs, + p.queryWithPlanName(qContext)))) + } rawSeries } - def materializeApplyInstantFunction(qContext: QueryContext, + protected def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = { + periodicSeriesPlan match { + case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq + case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq + case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq + case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq + case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs) + case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors) + case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors) + case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors) + case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors) + case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors) + case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors) + case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan | + _: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq() + } + } + + + def materializeApplyInstantFunction(qContext: QueryContext, lp: ApplyInstantFunction, forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala index d5ed505bf8..79b9f39bd5 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala @@ -253,6 +253,7 @@ object LogicalPlanUtils extends StrictLogging { // is an interval that we want to process on one machine (primarily to compute range functions). // SubqueryWithWindowing has such a lookback while TopLevelSubquery does not. logicalPlan match { + case psw: PeriodicSeriesWithWindowing => Seq(psw.window) case sww: SubqueryWithWindowing => getLookBackMillis(sww.innerPeriodicSeries).map(lb => lb + sww.subqueryWindowMs) case nl: NonLeafLogicalPlan => nl.children.flatMap(getLookBackMillis) case rs: RawSeries => Seq(rs.lookbackMs.getOrElse(WindowConstants.staleDataLookbackMillis)) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala index e09f21d032..0397c4ddd5 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala @@ -47,25 +47,23 @@ import filodb.query.exec._ PlanResult(Seq(execPlan)) } - private def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = { - periodicSeriesPlan match { - case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq - case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq - case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq - case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq - case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs) - case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors) - case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors) - case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors) - case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors) - case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors) - case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors) - case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan| - _: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq() + private def isPlanLongTimeAtModifier(plan: LogicalPlan): Boolean = { + plan match { + case periodicSeriesPlan: PeriodicSeriesPlan if periodicSeriesPlan.hasAtModifier => + val earliestRawTime = earliestRawTimestampFn + val offsetMillis = LogicalPlanUtils.getOffsetMillis(periodicSeriesPlan) + val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min) + val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max + val latestDownsampleTimestamp = latestDownsampleTimestampFn + val atMsList = getAtModifierTimestampsWithOffset(periodicSeriesPlan) + val realStartMs = Math.min(periodicSeriesPlan.startMs - maxOffset, atMsList.min - lookbackMs) + val realEndMs = Math.max(periodicSeriesPlan.endMs - minOffset, atMsList.max) + + realStartMs <= latestDownsampleTimestamp && realEndMs >= earliestRawTime + case _ => false } } - // scalastyle:off method.length private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = { import LogicalPlan._ @@ -74,23 +72,25 @@ import filodb.query.exec._ val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min) val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max - - val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset - // For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5 - val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan) + val startWithOffsetMs = if (periodicSeriesPlan.hasAtModifier) atModifierTimestampsWithOffset.min + else periodicSeriesPlan.startMs - maxOffset + // For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5 + val endWithOffsetMs = + if (periodicSeriesPlan.hasAtModifier) atModifierTimestampsWithOffset.max else periodicSeriesPlan.endMs - minOffset +// val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan) - val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) { - // should be in raw cluster. - atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime) - } else if (endWithOffsetMs - lookbackMs < earliestRawTime) { - // should be in down sample cluster. - atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime) - } else { - atModifierTimestampsWithOffset.isEmpty - } - require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" + - s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.") +// val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) { +// // should be in raw cluster. +// atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime) +// } else if (endWithOffsetMs - lookbackMs < earliestRawTime) { +// // should be in down sample cluster. +// atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime) +// } else { +// atModifierTimestampsWithOffset.isEmpty +// } +// require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" + +// s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.") if (maxOffset != minOffset && startWithOffsetMs - lookbackMs < earliestRawTime @@ -119,7 +119,8 @@ import filodb.query.exec._ // should check for ANY interval overalapping earliestRawTime. We // can happen with ANY lookback interval, not just the last one. } else if ( - endWithOffsetMs - lookbackMs < earliestRawTime || //TODO lookbacks can overlap in the middle intervals too + (endWithOffsetMs - lookbackMs < earliestRawTime && !periodicSeriesPlan.hasAtModifier) + || //TODO lookbacks can overlap in the middle intervals too LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan) ) { // For subqueries and long lookback queries, we keep things simple by routing to @@ -245,8 +246,8 @@ import filodb.query.exec._ override def walkLogicalPlanTree(logicalPlan: LogicalPlan, qContext: QueryContext, forceInProcess: Boolean = false): PlanResult = { - - if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan)) { + val isLongTimeAtModifier = isPlanLongTimeAtModifier(logicalPlan) + if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan) && !isLongTimeAtModifier) { logicalPlan match { case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p) case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext) @@ -266,8 +267,10 @@ import filodb.query.exec._ case lp: PeriodicSeriesWithWindowing => materializePeriodicSeriesPlan(qContext, lp) case lp: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, lp) case lp: ApplyInstantFunctionRaw => super.materializeApplyInstantFunctionRaw(qContext, lp) - case lp: Aggregate => materializePeriodicSeriesPlan(qContext, lp) - case lp: BinaryJoin => materializePeriodicSeriesPlan(qContext, lp) + case lp: Aggregate if isLongTimeAtModifier => super.materializeAggregate(qContext, lp, forceInProcess = true) + case lp: Aggregate if !isLongTimeAtModifier => materializePeriodicSeriesPlan(qContext, lp) + case lp: BinaryJoin if isLongTimeAtModifier => super.materializeBinaryJoin(qContext, lp, forceInProcess = true) + case lp: BinaryJoin if !isLongTimeAtModifier => materializePeriodicSeriesPlan(qContext, lp) case lp: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, lp) case lp: LabelValues => rawClusterMaterialize(qContext, lp) case lp: TsCardinalities => materializeTSCardinalityPlan(qContext, lp) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 94a56d8269..a1f14be5f9 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -158,6 +158,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv } ) ) + case _: RawSeries => materializePeriodicAndRawSeries(logicalPlan, qContext) case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess) } } else { @@ -352,7 +353,16 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv val stepMs = queryParams.stepSecs * 1000 val isInstantQuery: Boolean = if (queryParams.startSecs == queryParams.endSecs) true else false var prevPartitionStart = queryParams.startSecs * 1000 - val execPlans = partitions.zipWithIndex.map { case (p, i) => + val execPlans = partitions + .filter(p => { + if (!logicalPlan.hasAtModifier || !logicalPlan.isInstanceOf[PeriodicSeriesPlan]) + true + else { + val time = getAtModifierTimestampsWithOffset(logicalPlan.asInstanceOf[PeriodicSeriesPlan]).head + p.timeRange.startMs <= time && p.timeRange.endMs > time + } + }) + .zipWithIndex.map { case (p, i) => // First partition should start from query start time, no need to calculate time according to step for instant // queries val startMs = @@ -400,12 +410,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv timeRangeOverride: Option[TimeRange] = None): ExecPlan = { val qContextWithOverride = timeRangeOverride.map{ r => val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] - val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000) + val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000, + promQl = if (logicalPlan.hasAtModifier) LogicalPlanParser.convertToQuery(logicalPlan) else oldParams.promQl) queryContext.copy(origQueryParams = newParams) }.getOrElse(queryContext) val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams] val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs)) val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint) + if (partitionName.equals(localPartitionName)) { // FIXME: the below check is needed because subquery tests fail when their // time-ranges are updated even with the original query params. @@ -549,22 +561,29 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv .filter(_.isInstanceOf[RawSeries]) .map(getPartitions(_, qParams)) .exists(_.size > 1) - if (hasMultiPartitionLeaves) { + if (hasMultiPartitionLeaves && !logicalPlan.hasAtModifier) { materializeSplitLeafPlan(logicalPlan, qContext) - } else { logicalPlan match { - case agg: Aggregate => materializeAggregate(agg, qContext) - case psw: PeriodicSeriesWithWindowing => materializePeriodicAndRawSeries(psw, qContext) - case sqw: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, sqw) - case bj: BinaryJoin => materializeMultiPartitionBinaryJoinNoSplitLeaf(bj, qContext) - case sv: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, sv) - case aif: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, aif) - case svdp: ScalarVaryingDoublePlan => super.materializeScalarPlan(qContext, svdp) - case aaf: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, aaf) - case ps: PeriodicSeries => materializePeriodicAndRawSeries(ps, qContext) - case rcm: RawChunkMeta => materializePeriodicAndRawSeries(rcm, qContext) - case rs: RawSeries => materializePeriodicAndRawSeries(rs, qContext) - case x => throw new IllegalArgumentException(s"unhandled type: ${x.getClass}") - }} + } else { + // either have @modifier or only one partition. + val canSplit = hasMultiPartitionLeaves && logicalPlan.hasAtModifier + logicalPlan match { + case agg: Aggregate => materializeAggregate(agg, qContext) + case psw: PeriodicSeriesWithWindowing if canSplit => + super.materializePeriodicSeriesWithWindowing(qContext, psw, forceInProcess = true) + case psw: PeriodicSeriesWithWindowing if !canSplit => materializePeriodicAndRawSeries(psw, qContext) + case sqw: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, sqw) + case bj: BinaryJoin => materializeMultiPartitionBinaryJoinNoSplitLeaf(bj, qContext) + case sv: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, sv) + case aif: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, aif) + case svdp: ScalarVaryingDoublePlan => super.materializeScalarPlan(qContext, svdp) + case aaf: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, aaf) + case ps: PeriodicSeries if canSplit => super.materializePeriodicSeries(qContext, ps, forceInProcess = true) + case ps: PeriodicSeries if !canSplit => materializePeriodicAndRawSeries(ps, qContext) + case rcm: RawChunkMeta => materializePeriodicAndRawSeries(rcm, qContext) + case rs: RawSeries => materializePeriodicAndRawSeries(rs, qContext) + case x => throw new IllegalArgumentException(s"unhandled type: ${x.getClass}") + } + } } /** diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 75cd5e851e..3b9376ad13 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -231,14 +231,15 @@ class SingleClusterPlanner(val dataset: Dataset, //For binary join LHS & RHS should have same times val boundParams = periodicSeriesPlan.get.head match { case p: PeriodicSeries => (p.startMs, p.stepMs, WindowConstants.staleDataLookbackMillis, - p.offsetMs.getOrElse(0L), p.atMs.getOrElse(0), p.endMs) + p.offsetMs.getOrElse(0L), p.atMs.getOrElse(-1), p.endMs) case w: PeriodicSeriesWithWindowing => (w.startMs, w.stepMs, w.window, w.offsetMs.getOrElse(0L), w.atMs.getOrElse(0L), w.endMs) case _ => throw new UnsupportedOperationException(s"Invalid plan: ${periodicSeriesPlan.get.head}") } - val newStartMs = boundToStartTimeToEarliestRetained(boundParams._1, boundParams._2, boundParams._3, - boundParams._4) + val newStartMs = if (boundParams._5.==(-1L)) { + boundToStartTimeToEarliestRetained(boundParams._1, boundParams._2, boundParams._3, boundParams._4) + } else boundParams._1 if (newStartMs <= boundParams._6) { // if there is an overlap between query and retention ranges if (newStartMs != boundParams._1) Some(LogicalPlanUtils.copyLogicalPlanWithUpdatedSeconds( diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 57897657c0..bd63945759 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -112,6 +112,87 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan, expectedPlanTree) } + it("at modifier should work for multi-partition") { + def twoPartitions(timeRange: TimeRange): List[PartitionAssignment] = List( + PartitionAssignment("remote", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, + localPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2", + TimeRange(localPartitionStart * 1000, endSeconds * 1000))) + + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { + if (routingKey.equals(Map("job" -> "app"))) twoPartitions(timeRange) + else Nil + } + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = twoPartitions(timeRange) + + } + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val query = "last_over_time(test{job = \"app\"}[10m]) unless on(job) test{job = \"app\"} @end()" + + val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSeconds, step, endSeconds)) + + val promQlQueryParams = PromQlQueryParams(query, startSeconds, step, endSeconds) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + val expectedPlanTree = + """E~SetOperatorExec(binaryOp=LUnless, on=Some(List(job)), ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |--E~PromQlRemoteExec(PromQlQueryParams(last_over_time(test{job="app"}[600s]),1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |--T~PeriodicSamplesMapper(start=3000000, step=100000, end=3899000, window=Some(600000), functionId=Some(Last), rawSource=false, offsetMs=None) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |--E~PromQlRemoteExec(PromQlQueryParams(last_over_time(test{job="app"}[600s]),3900,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-T~RepeatTransformer(startMs=1000000, stepMs=100000, endMs=10000000, funcParams=List()) + |--T~PeriodicSamplesMapper(start=10000000, step=0, end=10000000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"} @10000,1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"} @10000,3400,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))""".stripMargin + validatePlan(execPlan, expectedPlanTree) + } + + it("StitchRvsExec should be the top level for multi-partition query when it has no @modifier") { + def twoPartitions(timeRange: TimeRange): List[PartitionAssignment] = List( + PartitionAssignment("remote", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, + localPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2", + TimeRange(localPartitionStart * 1000, endSeconds * 1000))) + + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { + if (routingKey.equals(Map("job" -> "app"))) twoPartitions(timeRange) + else Nil + } + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = twoPartitions(timeRange) + + } + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val query = "last_over_time(test{job = \"app\"}[10m]) unless on(job) test{job = \"app\"}" + + val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSeconds, step, endSeconds)) + + val promQlQueryParams = PromQlQueryParams(query, startSeconds, step, endSeconds) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + val expectedPlanTree = + """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(last_over_time(test{job = "app"}[10m]) unless on(job) test{job = "app"},1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~SetOperatorExec(binaryOp=LUnless, on=Some(List(job)), ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |--T~PeriodicSamplesMapper(start=3000000, step=100000, end=3899000, window=Some(600000), functionId=Some(Last), rawSource=false, offsetMs=None) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |--T~PeriodicSamplesMapper(start=3000000, step=100000, end=3899000, window=None, functionId=None, rawSource=false, offsetMs=None) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1200s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |----E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1200s],3899,1,3899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(last_over_time(test{job = "app"}[10m]) unless on(job) test{job = "app"},3900,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))""".stripMargin + validatePlan(execPlan, expectedPlanTree) + } + it ("should generate simple plan for one local partition for TopLevelSubquery") { val p1StartSecs = 1000 val p1EndSecs = 12000 diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index a3abefffe7..0c8c564e59 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -2060,21 +2060,22 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS TimeStepParams(now / 1000 - 5.days.toSeconds, step, endSeconds), Antlr) val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) val expectedPlan = - """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) - |-T~PeriodicSamplesMapper(start=1634345330000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634345270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) - |-T~PeriodicSamplesMapper(start=1634345330000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634345270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) - |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634345330,300,1634777330)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) - |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) - |----T~RepeatTransformer(startMs=1634345330000, stepMs=300000, endMs=1634777330000, funcParams=List()) - |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw) - |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) - |----T~RepeatTransformer(startMs=1634345330000, stepMs=300000, endMs=1634777330000, funcParams=List()) - |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],raw)""".stripMargin + """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |--T~PeriodicSamplesMapper(start=1634345330000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634345270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |--T~PeriodicSamplesMapper(start=1634345330000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634345270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634345330,300,1634777330)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |-----T~RepeatTransformer(startMs=1634345330000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw) + |-----T~RepeatTransformer(startMs=1634345330000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1630780616],raw)""".stripMargin validatePlan(execPlan, expectedPlan) } @@ -2085,74 +2086,138 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS TimeStepParams(startSeconds, step, now / 1000 - 9.days.toSeconds), Antlr) val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) val expectedPlan = - """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample) |-T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1633999730000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample) |-T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1633999730000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample) |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633913330,300,1633999730)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample) |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) |----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1633999730000, funcParams=List()) |-----T~PeriodicSamplesMapper(start=1633999730000, step=0, end=1633999730000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633999670000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633999670000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample) |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) |----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1633999730000, funcParams=List()) |-----T~PeriodicSamplesMapper(start=1633999730000, step=0, end=1633999730000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633999670000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testActor],downsample)""".stripMargin + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633999670000,1633999730000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-190763356],downsample)""".stripMargin validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException due to the need of both raw and downsample cluster with @modifier") { + it("should work for both raw and downsample cluster with @modifier") { val lp = Parser.queryRangeToLogicalPlan( """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin, TimeStepParams(startSeconds, step, endSeconds), Antlr) - val thrown = the[IllegalArgumentException] thrownBy - rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) - thrown.toString - .contains("both down sampled and raw data. Please adjust the query parameters if you want to use @modifier") shouldEqual true + + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |---T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |---T~PeriodicSamplesMapper(start=1634172830000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172770000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172530000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913270000,1634172530000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],downsample) + |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633913330,300,1634777330)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw) + |-----T~RepeatTransformer(startMs=1633913330000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |------T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#982803728],raw)""".stripMargin + validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException because topk needs both raw and downsample cluster with @modifier") { + it("should work for longtime planner when topk needs both raw and downsample cluster with @modifier") { val lp = Parser.queryRangeToLogicalPlan( """topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @end()))""".stripMargin, TimeStepParams(now / 1000 - 8.days.toSeconds, step, now / 1000), Antlr) - val thrown = the[IllegalArgumentException] thrownBy - rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) - thrown.toString - .contains("both down sampled and raw data. Please adjust the query parameters if you want to use @modifier") shouldEqual true + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634086130,300,1634777330)) + |-E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-596901980],raw) + |--T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-596901980],raw) + |----T~RepeatTransformer(startMs=1634086130000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-596901980],raw) + |----T~RepeatTransformer(startMs=1634086130000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-596901980],raw)""".stripMargin + validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException because @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") { + it("should work for longtime planner when @modifier and offset reads from downsample cluster, and the query range reads from raw cluster") { val lp = Parser.queryRangeToLogicalPlan( """rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m]) AND | topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] offset 8d @end()))""".stripMargin, TimeStepParams(now / 1000 - 1.days.toSeconds, step, now / 1000), Antlr) - val thrown = the[IllegalArgumentException] thrownBy - rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) - thrown.toString - .contains("both down sampled and raw data. Please adjust the query parameters if you want to use @modifier") shouldEqual true + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """E~SetOperatorExec(binaryOp=LAND, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],raw) + |--T~PeriodicSamplesMapper(start=1634690930000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634690870000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],raw) + |--T~PeriodicSamplesMapper(start=1634690930000, step=300000, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634690870000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],raw) + |-T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634690930,300,1634777330)) + |--E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],downsample) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----T~RepeatTransformer(startMs=1634690930000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000)) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634086070000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],downsample) + |---T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |----T~RepeatTransformer(startMs=1634690930000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=Some(691200000)) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634086070000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-894961790],downsample)""".stripMargin + validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException because while @modifier needs data from downsample cluster, the plan is dispatched to raw cluster") { + it("should work for longtime planner when @modifier needs data from downsample cluster, the plan is dispatched to downsample cluster") { val lp = Parser.queryRangeToLogicalPlan( s"""topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @${now / 1000 - 8.days.toSeconds}))""".stripMargin, TimeStepParams(now / 1000 - 1.days.toSeconds, step, now / 1000), Antlr) - val thrown = the[IllegalArgumentException] thrownBy - rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) - thrown.toString - .contains("both down sampled and raw data. Please adjust the query parameters if you want to use @modifier") shouldEqual true + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1634690930,300,1634777330)) + |-E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#577219851],downsample) + |--T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#577219851],downsample) + |----T~RepeatTransformer(startMs=1634690930000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634086070000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#577219851],downsample) + |----T~RepeatTransformer(startMs=1634690930000, stepMs=300000, endMs=1634777330000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634086130000, step=0, end=1634086130000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634086070000,1634086130000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#577219851],downsample)""".stripMargin + validatePlan(execPlan, expectedPlan) } - it("should thrown IllegalArgumentException because while @modifier needs data from raw cluster, the plan is dispatched to downsample cluster") { + it("should work for longtime planner when @modifier needs data from raw cluster, the plan is dispatched to raw cluster") { val lp = Parser.queryRangeToLogicalPlan( s"""topk(1, rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[1m] @${now / 1000}))""".stripMargin, TimeStepParams(now / 1000 - 9.days.toSeconds, step, now / 1000 - 8.days.toSeconds), Antlr) - val thrown = the[IllegalArgumentException] thrownBy - rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) - thrown.toString - .contains("both down sampled and raw data. Please adjust the query parameters if you want to use @modifier") shouldEqual true + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expectedPlan = + """T~AggregatePresenter(aggrOp=TopK, aggrParams=List(1.0), rangeParams=RangeParams(1633999730,300,1634086130)) + |-E~LocalPartitionReduceAggregateExec(aggrOp=TopK, aggrParams=List(1.0)) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1598010777],raw) + |--T~AggregateMapReduce(aggrOp=TopK, aggrParams=List(1.0), without=List(), by=List()) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1598010777],raw) + |----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1598010777],raw) + |----T~RepeatTransformer(startMs=1633999730000, stepMs=300000, endMs=1634086130000, funcParams=List()) + |-----T~PeriodicSamplesMapper(start=1634777330000, step=0, end=1634777330000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634777270000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1598010777],raw)""".stripMargin + validatePlan(execPlan, expectedPlan) } it("both modifier and query range require the data from downsample cluster.") { diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index 1962fdb2d8..5b2e22923a 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -20,6 +20,8 @@ sealed trait LogicalPlan { */ def isTimeSplittable: Boolean = true + def hasAtModifier: Boolean = false + /** * Replace filters present in logical plan */ @@ -48,6 +50,10 @@ sealed trait RawSeriesLikePlan extends LogicalPlan { sealed trait NonLeafLogicalPlan extends LogicalPlan { def children: Seq[LogicalPlan] + + override def hasAtModifier: Boolean = { + children.exists(_.hasAtModifier) + } } /** @@ -261,6 +267,10 @@ case class PeriodicSeries(rawSeries: RawSeriesLikePlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(rawSeries = rawSeries.replaceRawSeriesFilters(filters)) + + override def hasAtModifier: Boolean = { + atMs.nonEmpty + } } /** @@ -323,6 +333,9 @@ case class SubqueryWithWindowing( val updatedFunctionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan]) this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries, functionArgs = updatedFunctionArgs) } + override def hasAtModifier: Boolean = { + atMs.nonEmpty + } } /** @@ -361,6 +374,10 @@ case class TopLevelSubquery( val updatedInnerPeriodicSeries = innerPeriodicSeries.replacePeriodicSeriesFilters(filters) this.copy(innerPeriodicSeries = updatedInnerPeriodicSeries) } + + override def hasAtModifier: Boolean = { + atMs.nonEmpty || children.exists(_.hasAtModifier) + } } /** @@ -390,6 +407,10 @@ case class PeriodicSeriesWithWindowing(series: RawSeriesLikePlan, this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), series = series.replaceRawSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) + + override def hasAtModifier: Boolean = { + atMs.nonEmpty || children.exists(_.hasAtModifier) + } } /** @@ -438,6 +459,9 @@ case class Aggregate(operator: AggregationOperator, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -468,6 +492,10 @@ case class BinaryJoin(lhs: PeriodicSeriesPlan, override def isRoutable: Boolean = lhs.isRoutable || rhs.isRoutable override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(lhs = lhs.replacePeriodicSeriesFilters(filters), rhs = rhs.replacePeriodicSeriesFilters(filters)) + + override def hasAtModifier: Boolean = { + rhs.hasAtModifier || lhs.hasAtModifier + } } /** @@ -503,6 +531,10 @@ case class ApplyInstantFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy( vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) + + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -518,6 +550,9 @@ case class ApplyInstantFunctionRaw(vectors: RawSeries, vectors = vectors.replaceRawSeriesFilters(newFilters).asInstanceOf[RawSeries], functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(newFilters).asInstanceOf[FunctionArgsPlan])) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -533,6 +568,10 @@ case class ApplyMiscellaneousFunction(vectors: PeriodicSeriesPlan, override def endMs: Long = vectors.endMs override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) + + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -546,6 +585,10 @@ case class ApplySortFunction(vectors: PeriodicSeriesPlan, override def endMs: Long = vectors.endMs override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(vectors = vectors.replacePeriodicSeriesFilters(filters)) + + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -576,6 +619,10 @@ final case class ScalarVaryingDoublePlan(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy( vectors = vectors.replacePeriodicSeriesFilters(filters), functionArgs = functionArgs.map(_.replacePeriodicSeriesFilters(filters).asInstanceOf[FunctionArgsPlan])) + + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -617,6 +664,10 @@ final case class VectorPlan(scalars: ScalarPlan) extends PeriodicSeriesPlan with override def isRoutable: Boolean = scalars.isRoutable override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(scalars = scalars.replacePeriodicSeriesFilters(filters).asInstanceOf[ScalarPlan]) + + override def hasAtModifier: Boolean = { + scalars.hasAtModifier + } } /** @@ -637,6 +688,10 @@ case class ScalarBinaryOperation(operator: BinaryOperator, asInstanceOf[ScalarBinaryOperation]) else Left(rhs.left.get) this.copy(lhs = updatedLhs, rhs = updatedRhs) } + + override def hasAtModifier: Boolean = { + (lhs.isRight && lhs.right.get.hasAtModifier) || (rhs.isRight && rhs.right.get.hasAtModifier) + } } /** @@ -653,6 +708,9 @@ case class ApplyAbsentFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } /** @@ -669,6 +727,10 @@ case class ApplyLimitFunction(vectors: PeriodicSeriesPlan, override def replacePeriodicSeriesFilters(filters: Seq[ColumnFilter]): PeriodicSeriesPlan = this.copy(columnFilters = LogicalPlan.overrideColumnFilters(columnFilters, filters), vectors = vectors.replacePeriodicSeriesFilters(filters)) + + override def hasAtModifier: Boolean = { + vectors.hasAtModifier + } } object LogicalPlan { diff --git a/query/src/main/scala/filodb/query/exec/SetOperatorExec.scala b/query/src/main/scala/filodb/query/exec/SetOperatorExec.scala index f557274a47..783907a2c9 100644 --- a/query/src/main/scala/filodb/query/exec/SetOperatorExec.scala +++ b/query/src/main/scala/filodb/query/exec/SetOperatorExec.scala @@ -108,7 +108,8 @@ final case class SetOperatorExec(queryContext: QueryContext, private[exec] def setOpAnd(lhsRvs: List[RangeVector], rhsRvs: List[RangeVector], rhsSchema: ResultSchema): Iterator[RangeVector] = { // isEmpty method consumes rhs range vector - require(rhsRvs.forall(_.isInstanceOf[SerializedRangeVector]), "RHS should be SerializedRangeVector") + require(rhsRvs.forall(_.isInstanceOf[SerializableRangeVector]), + s"RHS should be SerializedRangeVector ${rhsRvs.map(r => s"yu-test----- $r")}") val result = new mutable.HashMap[Map[Utf8Str, Utf8Str], ArrayBuffer[RangeVector]]() val rhsMap = new mutable.HashMap[Map[Utf8Str, Utf8Str], RangeVector]()