Skip to content

Commit

Permalink
feat(query) Support no downtime query capabilities in case of time sp…
Browse files Browse the repository at this point in the history
…lit partitions
  • Loading branch information
amolnayak311 committed Sep 19, 2023
1 parent f198be8 commit 207078a
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,11 @@ object PlannerUtil extends StrictLogging {
lp.copy(rawSeries = rewritePlanWithRemoteRawExport(lp.rawSeries, rangeSelector, additionalLookback)
.asInstanceOf[RawSeriesLikePlan])
case lp: PeriodicSeriesWithWindowing =>
lp.copy(series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookback)
val rs = rangeSelector.asInstanceOf[IntervalSelector]
lp.copy(
startMs = rs.from,
endMs = rs.to,
series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookback)
.asInstanceOf[RawSeriesLikePlan])
case lp: MetadataQueryPlan => lp
case lp: TsCardinalities => lp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,40 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
// If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// raw series is doing a remote call to get all the data.
logicalPlan match {
case lp: RawSeries =>
assert(lp.supportsRemoteDataCall, "RawSeries with forceInProcess set to true only supports remote data call")
case lp: RawSeries if lp.supportsRemoteDataCall=>
//println(lp)
// materialize the raw query for the partition by rewriting the prom parameters
// A Raw series from [x, y] offset o and lookback l is same as doing a raw export as of time instant y
// with a lookback of (y - x) + o + l, for each partition this internal spans, we will find the remote/local
// partitions and stitch the results
val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val rs = lp.rangeSelector.asInstanceOf[IntervalSelector]
val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(lp),
startSecs = rs.from / 1000,
endSecs = rs.to / 1000,
stepSecs = 0
)
val newContext = qContext.copy(origQueryParams = newPromQlParams)
val partition = getPartitions(lp, newPromQlParams)
assert(partition.size == 1, "Raw series export is expected to be running in just one partition")
PlanResult(Seq(materializeForPartition(lp, partition.head, newContext)))

val (rawExportStart, rawExportEnd) =
(rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L))

val partition = getPartitions(lp, params)
// For each partition, do a raw data export range query
val execPlans = partition.map(pa => {
val (thisPartitionStartMs, thisPartitionEndMs) =
(Math.max(pa.timeRange.startMs, rawExportStart), Math.min(pa.timeRange.endMs, rawExportEnd))
val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs
// Add additional 1 second to avoid ms to sec rounding issues
val thisPartitionLp = lp.copy(offsetMs = None, lookbackMs = Some(totalOffsetThisPartitionMs + 1000L))
val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(thisPartitionLp),
startSecs = thisPartitionEndMs / 1000,
endSecs = thisPartitionEndMs / 1000,
stepSecs = 1
)
val newContext = qContext.copy(origQueryParams = newPromQlParams)
materializeForPartition(thisPartitionLp, pa, newContext)
})
PlanResult(
Seq( if (execPlans.tail == Seq.empty) execPlans.head
else
StitchRvsExec(qContext, inProcessPlanDispatcher, rvRangeFromPlan(logicalPlan),
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]))
))
case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess)
}
} else {
Expand Down Expand Up @@ -522,55 +542,27 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
getAssignmentQueryRanges(partitions, timeRange,
lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt)
}
//walkLogicalPlanTree(logicalPlan, qContext, true)
require(!assignmentRanges.isEmpty, s"Assignment ranges is not expected to be empty for query ${qParams.promQl}")
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan]))
{
case (acc, next) => acc match {
case (Some((prevAssignment, prevTimeRange)), ep : ListBuffer[ExecPlan]) =>
val (currentAssignment, currentTimeRange) = next
val currentPartRange = qParams.copy(startSecs = currentTimeRange.startMs / 1000,
endSecs = currentTimeRange.endMs / 1000)

case (Some((_, prevTimeRange)), ep : ListBuffer[ExecPlan]) =>
val (currentAssignment, currentTimeRange) = next
val (startTime, endTime) = (prevTimeRange.endMs + 1, currentTimeRange.startMs - 1)

// TODO: ALl this will be protected by the flag whether or not we want to enable stitching

// If we enable stitching the missing part of time range between the previous time range's end time and
// current time range's start time, we will perform remote raw data export
if (queryConfig.supportRemoteRawExport && startTime < endTime) {
// We need to perform raw data export from two partitions, the partition on the left and partition
// on the right, the missing data requires to get data
//
//
// We will always need the offsetMs + lookbackMillis of raw data from potentially both left and right
// partitions. If the end time range of previous query is same as the previous assignment end time range,
// no data from the partition is needed, similarly when the start date of the right partition less then
// the start time of the query in right partition, we need raw data from left partition.
// TODO: Have configs to not perform raw exports if the export is beyond a certain value, for example
// foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure and
// OOMs. The max cross partition raw export config can control such queries from bring the process down but
// simpler queries with few minutes or even hour or two of lookback/offset will continue to work seamlessly
// with no data gaps

// IMPORTANT: The raw export assumes the data needs to come from just previous partition. Inthrory there can
// be multiple time splits and the raw export will then need to span previous n partitions and stitch the data
// together. However, for simplicity we will restrict this raw export to just previous partition. If the,
// start of raw export exceeds the previous partition assignment's start, we simply keep the gap, which is
// the default behavior.
// Period to perform raw export offsetMs + lookbackMs
// If previous end is < previous partition's end time, we need data from this partition
if (prevTimeRange.endMs < prevAssignment.timeRange.endMs) {
// Walk the plan to make all RawSeries support remote and materialize getting the data from next partition

}

// If current start is > partitions's start time, export raw data from previous partition
if (currentTimeRange.startMs > currentAssignment.timeRange.startMs) {
// Walk the plan to make all RawSeries support remote export fetching the data from previous partition
// When we rewrite the RawSeries's rangeSelector, we will make the start and end time same as the end of the
// previous partition's end time and then do a raw query for the duration of the
// (currentTimeRange.startMs - currentAssignment.timeRange.startMs) + offset + lookback.
// TODO: also use totalExpectedRawExport to block raw export if it exceeds the max permitted raw export

// Partition split end time for queries in partition 1
// V(p) V(t1)
// |----o----------------|---x------x-----------------------------o-------|
Expand All @@ -585,25 +577,26 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
// Given the offset is 10m, lookback is 5m, we would need raw data in the range
// [t1 - 5m - 10m, t2], this range for raw queries span two partitions
// we need to export foo[]
val totalExpectedRawExport =
(currentTimeRange.startMs - currentAssignment.timeRange.startMs) + lookbackMs + offsetMs
// Only if the raw export is completely within the previous partition's timerange
if(prevAssignment.timeRange.endMs - totalExpectedRawExport >= prevAssignment.timeRange.startMs) {
val newParams = qParams.copy(startSecs = currentAssignment.timeRange.startMs / 1000,
endSecs = currentTimeRange.startMs / 1000 - 1)
val newContext = qContext.copy(origQueryParams = newParams)

val newLp = rewritePlanWithRemoteRawExport(logicalPlan,
IntervalSelector(prevAssignment.timeRange.endMs, prevAssignment.timeRange.endMs),
additionalLookback = currentTimeRange.startMs - currentAssignment.timeRange.startMs + 1000)

// Do not perform raw exports if the export is beyond a certain value for example
// foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure and
// OOMs. The max cross partition raw export config can control such queries from bring the process down but
// simpler queries with few minutes or even hour or two of lookback/offset will continue to work seamlessly
// with no data gaps
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs
if (queryConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
// Only if the raw export is completely within the previous partition's timerange
val newParams = qParams.copy(startSecs = startTime / 1000, endSecs = endTime / 1000 - 1)
val newContext = qContext.copy(origQueryParams = newParams)
val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(startTime, endTime))
ep ++= walkLogicalPlanTree(newLp, newContext, true).plans
} else {
logger.warn(s"Remote raw export is supported but the expected raw export for $totalExpectedRawExport ms" +
s" is greater than the max allowed raw export duration ${queryConfig.maxRemoteRawExportTimeRange}")
}
}


val newParams = qParams.copy(startSecs = currentTimeRange.startMs / 1000,
endSecs = currentTimeRange.endMs / 1000)
endSecs = currentTimeRange.endMs / 1000)
val newContext = qContext.copy(origQueryParams = newParams)
ep += materializeForPartition(logicalPlan, currentAssignment, newContext)
(Some(next), ep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.BinaryOperator.{ADD, LAND}
import filodb.query.InstantFunctionId.Ln
import filodb.query.{IntervalSelector, LabelCardinality, LogicalPlan, PlanValidationSpec, RawSeries, SeriesKeysByFilters, TsCardinalities}
import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities}
import filodb.query.exec._

class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{
Expand Down Expand Up @@ -1636,30 +1636,44 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida

it("should generate generate a raw export from remote from multiple partitions and stitch") {

def twoPartitions(timeRange: TimeRange): List[PartitionAssignment] = List(
PartitionAssignment("remote", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs,
localPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2",
TimeRange(localPartitionStart * 1000, endSeconds * 1000)))
val p1StartSecs = 1000
val p1EndSecs = 6999
val p2StartSecs = 7000
val p2EndSecs = 15000
val stepSecs = 100
val queryStartSecs = 12000
val subqueryLookbackSecs = 9000

def twoPartitions(): List[PartitionAssignment] = List(
PartitionAssignment("remote", "remote-url", TimeRange(p1StartSecs * 1000, p1EndSecs * 1000)),
PartitionAssignment("local", "local-url", TimeRange(p2StartSecs * 1000, p2EndSecs * 1000))
)

val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = {
if (routingKey.equals(Map("job" -> "app"))) twoPartitions(timeRange)
if (routingKey.equals(Map("job" -> "app"))) twoPartitions().filter(
(p: PartitionAssignment) => {
val startWithinPartition = (p.timeRange.startMs <= timeRange.startMs) && (p.timeRange.endMs > timeRange.startMs)
val endWithinPartition = (p.timeRange.startMs <= timeRange.endMs) && (p.timeRange.endMs > timeRange.endMs)
val partitionWithinInterval = (p.timeRange.startMs >= timeRange.startMs) && (p.timeRange.endMs < timeRange.endMs)
startWithinPartition || endWithinPartition || partitionWithinInterval
})
else Nil
}

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = twoPartitions(timeRange)

def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment] = twoPartitions()
}


val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig)
val lp = RawSeries(rangeSelector = IntervalSelector(startSeconds, endSeconds),
filters = Seq(ColumnFilter("job", Equals("job")), ColumnFilter("__name__", Equals("test"))),
columns = Nil,
supportsRemoteDataCall = true)
val promQlQueryParams = PromQlQueryParams("test{job = \"app\"}", startSeconds, step, endSeconds)
val query = "sum(rate(test{job = \"app\"}[10m]))"
val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(2000, stepSecs, 10000))

val promQlQueryParams = PromQlQueryParams(query, 1000, 100, 10000)
val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))
print(execPlan.toString)
print(execPlan.printTree())
}


Expand Down
5 changes: 4 additions & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,11 @@ filodb {
# Partition names are case insensitive, to block using grpc remote QS across all partitions use *
partitions-deny-list = ""
}


routing {
# not currently used
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 3 days
}

# Config values are used when partialResults query parameter is not provided
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/filodb.core/query/QueryConfig.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package filodb.core.query

import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.{DurationInt, FiniteDuration}

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._



object QueryConfig {
val DefaultVectorsLimit = 150

Expand All @@ -23,6 +27,10 @@ object QueryConfig {
val grpcDenyList = queryConfig.getString("grpc.partitions-deny-list")
val containerOverrides = queryConfig.as[Map[String, Int]]("container-size-overrides")
val numRvsPerResultMessage = queryConfig.getInt("num-rvs-per-result-message")
val supportRemoteRawExport = queryConfig.getBoolean("routing.enable-remote-raw-exports")
val maxRemoteRawExportTimeRange =
FiniteDuration(
queryConfig.getDuration("routing.max-time-range-remote-raw-export").toMillis, TimeUnit.MILLISECONDS)
QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram,
fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"),
routingConfig.as[Option[Long]]("remote.http.timeout"),
Expand All @@ -32,7 +40,7 @@ object QueryConfig {
allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery,
grpcDenyList.split(",").map(_.trim.toLowerCase).toSet,
None,
containerOverrides)
containerOverrides, supportRemoteRawExport, maxRemoteRawExportTimeRange)
}

import scala.concurrent.duration._
Expand Down Expand Up @@ -75,4 +83,6 @@ case class QueryConfig(askTimeout: FiniteDuration,
allowPartialResultsMetadataQuery: Boolean = true,
grpcPartitionsDenyList: Set[String] = Set.empty,
plannerSelector: Option[String] = None,
recordContainerOverrides: Map[String, Int] = Map.empty)
recordContainerOverrides: Map[String, Int] = Map.empty,
supportRemoteRawExport: Boolean = false,
maxRemoteRawExportTimeRange: FiniteDuration = 3 days)

0 comments on commit 207078a

Please sign in to comment.