From 207078ae1a6e8b0c7464d7408f808a03331c284d Mon Sep 17 00:00:00 2001 From: Amol Nayak Date: Tue, 19 Sep 2023 16:41:52 -0700 Subject: [PATCH] feat(query) Support no downtime query capabilities in case of time split partitions --- .../queryplanner/DefaultPlanner.scala | 6 +- .../queryplanner/MultiPartitionPlanner.scala | 111 ++++++++---------- .../MultiPartitionPlannerSpec.scala | 42 ++++--- core/src/main/resources/filodb-defaults.conf | 5 +- .../scala/filodb.core/query/QueryConfig.scala | 16 ++- 5 files changed, 102 insertions(+), 78 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 5806a95849..2f0eded218 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -683,7 +683,11 @@ object PlannerUtil extends StrictLogging { lp.copy(rawSeries = rewritePlanWithRemoteRawExport(lp.rawSeries, rangeSelector, additionalLookback) .asInstanceOf[RawSeriesLikePlan]) case lp: PeriodicSeriesWithWindowing => - lp.copy(series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookback) + val rs = rangeSelector.asInstanceOf[IntervalSelector] + lp.copy( + startMs = rs.from, + endMs = rs.to, + series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookback) .asInstanceOf[RawSeriesLikePlan]) case lp: MetadataQueryPlan => lp case lp: TsCardinalities => lp diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 8bcbe77716..d6d88f9101 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -115,20 +115,40 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider // If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the // raw series is doing a remote call to get all the data. logicalPlan match { - case lp: RawSeries => - assert(lp.supportsRemoteDataCall, "RawSeries with forceInProcess set to true only supports remote data call") + case lp: RawSeries if lp.supportsRemoteDataCall=> + //println(lp) // materialize the raw query for the partition by rewriting the prom parameters + // A Raw series from [x, y] offset o and lookback l is same as doing a raw export as of time instant y + // with a lookback of (y - x) + o + l, for each partition this internal spans, we will find the remote/local + // partitions and stitch the results val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] val rs = lp.rangeSelector.asInstanceOf[IntervalSelector] - val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(lp), - startSecs = rs.from / 1000, - endSecs = rs.to / 1000, - stepSecs = 0 - ) - val newContext = qContext.copy(origQueryParams = newPromQlParams) - val partition = getPartitions(lp, newPromQlParams) - assert(partition.size == 1, "Raw series export is expected to be running in just one partition") - PlanResult(Seq(materializeForPartition(lp, partition.head, newContext))) + + val (rawExportStart, rawExportEnd) = + (rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L)) + + val partition = getPartitions(lp, params) + // For each partition, do a raw data export range query + val execPlans = partition.map(pa => { + val (thisPartitionStartMs, thisPartitionEndMs) = + (Math.max(pa.timeRange.startMs, rawExportStart), Math.min(pa.timeRange.endMs, rawExportEnd)) + val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs + // Add additional 1 second to avoid ms to sec rounding issues + val thisPartitionLp = lp.copy(offsetMs = None, lookbackMs = Some(totalOffsetThisPartitionMs + 1000L)) + val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(thisPartitionLp), + startSecs = thisPartitionEndMs / 1000, + endSecs = thisPartitionEndMs / 1000, + stepSecs = 1 + ) + val newContext = qContext.copy(origQueryParams = newPromQlParams) + materializeForPartition(thisPartitionLp, pa, newContext) + }) + PlanResult( + Seq( if (execPlans.tail == Seq.empty) execPlans.head + else + StitchRvsExec(qContext, inProcessPlanDispatcher, rvRangeFromPlan(logicalPlan), + execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec])) + )) case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess) } } else { @@ -522,55 +542,27 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider getAssignmentQueryRanges(partitions, timeRange, lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt) } - //walkLogicalPlanTree(logicalPlan, qContext, true) require(!assignmentRanges.isEmpty, s"Assignment ranges is not expected to be empty for query ${qParams.promQl}") // materialize a plan for each range/assignment pair val (_, execPlans) = assignmentRanges.foldLeft( (None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) { case (acc, next) => acc match { - case (Some((prevAssignment, prevTimeRange)), ep : ListBuffer[ExecPlan]) => - val (currentAssignment, currentTimeRange) = next - val currentPartRange = qParams.copy(startSecs = currentTimeRange.startMs / 1000, - endSecs = currentTimeRange.endMs / 1000) - + case (Some((_, prevTimeRange)), ep : ListBuffer[ExecPlan]) => + val (currentAssignment, currentTimeRange) = next + val (startTime, endTime) = (prevTimeRange.endMs + 1, currentTimeRange.startMs - 1) - // TODO: ALl this will be protected by the flag whether or not we want to enable stitching + // If we enable stitching the missing part of time range between the previous time range's end time and + // current time range's start time, we will perform remote raw data export + if (queryConfig.supportRemoteRawExport && startTime < endTime) { // We need to perform raw data export from two partitions, the partition on the left and partition // on the right, the missing data requires to get data - // - // - // We will always need the offsetMs + lookbackMillis of raw data from potentially both left and right - // partitions. If the end time range of previous query is same as the previous assignment end time range, - // no data from the partition is needed, similarly when the start date of the right partition less then - // the start time of the query in right partition, we need raw data from left partition. - // TODO: Have configs to not perform raw exports if the export is beyond a certain value, for example - // foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure and - // OOMs. The max cross partition raw export config can control such queries from bring the process down but - // simpler queries with few minutes or even hour or two of lookback/offset will continue to work seamlessly - // with no data gaps - - // IMPORTANT: The raw export assumes the data needs to come from just previous partition. Inthrory there can - // be multiple time splits and the raw export will then need to span previous n partitions and stitch the data - // together. However, for simplicity we will restrict this raw export to just previous partition. If the, - // start of raw export exceeds the previous partition assignment's start, we simply keep the gap, which is - // the default behavior. - // Period to perform raw export offsetMs + lookbackMs - // If previous end is < previous partition's end time, we need data from this partition - if (prevTimeRange.endMs < prevAssignment.timeRange.endMs) { - // Walk the plan to make all RawSeries support remote and materialize getting the data from next partition - - } - // If current start is > partitions's start time, export raw data from previous partition - if (currentTimeRange.startMs > currentAssignment.timeRange.startMs) { // Walk the plan to make all RawSeries support remote export fetching the data from previous partition // When we rewrite the RawSeries's rangeSelector, we will make the start and end time same as the end of the // previous partition's end time and then do a raw query for the duration of the // (currentTimeRange.startMs - currentAssignment.timeRange.startMs) + offset + lookback. - // TODO: also use totalExpectedRawExport to block raw export if it exceeds the max permitted raw export - // Partition split end time for queries in partition 1 // V(p) V(t1) // |----o----------------|---x------x-----------------------------o-------| @@ -585,25 +577,26 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider // Given the offset is 10m, lookback is 5m, we would need raw data in the range // [t1 - 5m - 10m, t2], this range for raw queries span two partitions // we need to export foo[] - val totalExpectedRawExport = - (currentTimeRange.startMs - currentAssignment.timeRange.startMs) + lookbackMs + offsetMs - // Only if the raw export is completely within the previous partition's timerange - if(prevAssignment.timeRange.endMs - totalExpectedRawExport >= prevAssignment.timeRange.startMs) { - val newParams = qParams.copy(startSecs = currentAssignment.timeRange.startMs / 1000, - endSecs = currentTimeRange.startMs / 1000 - 1) - val newContext = qContext.copy(origQueryParams = newParams) - - val newLp = rewritePlanWithRemoteRawExport(logicalPlan, - IntervalSelector(prevAssignment.timeRange.endMs, prevAssignment.timeRange.endMs), - additionalLookback = currentTimeRange.startMs - currentAssignment.timeRange.startMs + 1000) + // Do not perform raw exports if the export is beyond a certain value for example + // foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure and + // OOMs. The max cross partition raw export config can control such queries from bring the process down but + // simpler queries with few minutes or even hour or two of lookback/offset will continue to work seamlessly + // with no data gaps + val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs + if (queryConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) { + // Only if the raw export is completely within the previous partition's timerange + val newParams = qParams.copy(startSecs = startTime / 1000, endSecs = endTime / 1000 - 1) + val newContext = qContext.copy(origQueryParams = newParams) + val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(startTime, endTime)) ep ++= walkLogicalPlanTree(newLp, newContext, true).plans + } else { + logger.warn(s"Remote raw export is supported but the expected raw export for $totalExpectedRawExport ms" + + s" is greater than the max allowed raw export duration ${queryConfig.maxRemoteRawExportTimeRange}") } } - - val newParams = qParams.copy(startSecs = currentTimeRange.startMs / 1000, - endSecs = currentTimeRange.endMs / 1000) + endSecs = currentTimeRange.endMs / 1000) val newContext = qContext.copy(origQueryParams = newParams) ep += materializeForPartition(logicalPlan, currentAssignment, newContext) (Some(next), ep) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 59312e6d8a..b70b9b6abf 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.BinaryOperator.{ADD, LAND} import filodb.query.InstantFunctionId.Ln -import filodb.query.{IntervalSelector, LabelCardinality, LogicalPlan, PlanValidationSpec, RawSeries, SeriesKeysByFilters, TsCardinalities} +import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities} import filodb.query.exec._ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{ @@ -1636,30 +1636,44 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida it("should generate generate a raw export from remote from multiple partitions and stitch") { - def twoPartitions(timeRange: TimeRange): List[PartitionAssignment] = List( - PartitionAssignment("remote", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, - localPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2", - TimeRange(localPartitionStart * 1000, endSeconds * 1000))) + val p1StartSecs = 1000 + val p1EndSecs = 6999 + val p2StartSecs = 7000 + val p2EndSecs = 15000 + val stepSecs = 100 + val queryStartSecs = 12000 + val subqueryLookbackSecs = 9000 + + def twoPartitions(): List[PartitionAssignment] = List( + PartitionAssignment("remote", "remote-url", TimeRange(p1StartSecs * 1000, p1EndSecs * 1000)), + PartitionAssignment("local", "local-url", TimeRange(p2StartSecs * 1000, p2EndSecs * 1000)) + ) val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { - if (routingKey.equals(Map("job" -> "app"))) twoPartitions(timeRange) + if (routingKey.equals(Map("job" -> "app"))) twoPartitions().filter( + (p: PartitionAssignment) => { + val startWithinPartition = (p.timeRange.startMs <= timeRange.startMs) && (p.timeRange.endMs > timeRange.startMs) + val endWithinPartition = (p.timeRange.startMs <= timeRange.endMs) && (p.timeRange.endMs > timeRange.endMs) + val partitionWithinInterval = (p.timeRange.startMs >= timeRange.startMs) && (p.timeRange.endMs < timeRange.endMs) + startWithinPartition || endWithinPartition || partitionWithinInterval + }) else Nil } - override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = twoPartitions(timeRange) - + def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], + timeRange: TimeRange): List[PartitionAssignment] = twoPartitions() } + + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) - val lp = RawSeries(rangeSelector = IntervalSelector(startSeconds, endSeconds), - filters = Seq(ColumnFilter("job", Equals("job")), ColumnFilter("__name__", Equals("test"))), - columns = Nil, - supportsRemoteDataCall = true) - val promQlQueryParams = PromQlQueryParams("test{job = \"app\"}", startSeconds, step, endSeconds) + val query = "sum(rate(test{job = \"app\"}[10m]))" + val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, stepSecs, 10000)) + val promQlQueryParams = PromQlQueryParams(query, 1000, 100, 10000) val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) - print(execPlan.toString) + print(execPlan.printTree()) } diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 5112af6ecc..b6960a0dde 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -342,8 +342,11 @@ filodb { # Partition names are case insensitive, to block using grpc remote QS across all partitions use * partitions-deny-list = "" } + + routing { - # not currently used + enable-remote-raw-exports = false + max-time-range-remote-raw-export = 3 days } # Config values are used when partialResults query parameter is not provided diff --git a/core/src/main/scala/filodb.core/query/QueryConfig.scala b/core/src/main/scala/filodb.core/query/QueryConfig.scala index 98ab9ffe27..305933628f 100644 --- a/core/src/main/scala/filodb.core/query/QueryConfig.scala +++ b/core/src/main/scala/filodb.core/query/QueryConfig.scala @@ -1,10 +1,14 @@ package filodb.core.query -import scala.concurrent.duration.FiniteDuration +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.{DurationInt, FiniteDuration} import com.typesafe.config.Config import net.ceedubs.ficus.Ficus._ + + object QueryConfig { val DefaultVectorsLimit = 150 @@ -23,6 +27,10 @@ object QueryConfig { val grpcDenyList = queryConfig.getString("grpc.partitions-deny-list") val containerOverrides = queryConfig.as[Map[String, Int]]("container-size-overrides") val numRvsPerResultMessage = queryConfig.getInt("num-rvs-per-result-message") + val supportRemoteRawExport = queryConfig.getBoolean("routing.enable-remote-raw-exports") + val maxRemoteRawExportTimeRange = + FiniteDuration( + queryConfig.getDuration("routing.max-time-range-remote-raw-export").toMillis, TimeUnit.MILLISECONDS) QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram, fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"), routingConfig.as[Option[Long]]("remote.http.timeout"), @@ -32,7 +40,7 @@ object QueryConfig { allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery, grpcDenyList.split(",").map(_.trim.toLowerCase).toSet, None, - containerOverrides) + containerOverrides, supportRemoteRawExport, maxRemoteRawExportTimeRange) } import scala.concurrent.duration._ @@ -75,4 +83,6 @@ case class QueryConfig(askTimeout: FiniteDuration, allowPartialResultsMetadataQuery: Boolean = true, grpcPartitionsDenyList: Set[String] = Set.empty, plannerSelector: Option[String] = None, - recordContainerOverrides: Map[String, Int] = Map.empty) + recordContainerOverrides: Map[String, Int] = Map.empty, + supportRemoteRawExport: Boolean = false, + maxRemoteRawExportTimeRange: FiniteDuration = 3 days)