Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query) Support split partition raw queries #1677

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0f9f696
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 14, 2023
67b3a0e
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 16, 2023
435b9db
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 18, 2023
2d183d6
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 22, 2023
288ea40
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 22, 2023
f198be8
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 26, 2023
207078a
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Sep 19, 2023
93c8db6
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Sep 21, 2023
e9a996d
WIP
amolnayak311 Nov 17, 2023
3c119aa
WIP
amolnayak311 Dec 1, 2023
5c6226c
WIP
amolnayak311 Dec 13, 2023
8e5888a
WIP
amolnayak311 Dec 13, 2023
04f8c1e
WIP
amolnayak311 Dec 13, 2023
aff7ac4
WIP
amolnayak311 Dec 13, 2023
f5595f6
WIP
amolnayak311 Dec 13, 2023
f6ac82f
WIP
amolnayak311 Dec 13, 2023
3f2f281
WIP
amolnayak311 Dec 14, 2023
ffd7c5c
WIP
amolnayak311 Dec 15, 2023
65fa999
PR Review changes
amolnayak311 Jan 3, 2024
27a8e7b
PR Review changes
amolnayak311 Jan 3, 2024
8c33a2d
Fix conflicts
amolnayak311 Jan 3, 2024
63ea481
Fix style
amolnayak311 Jan 3, 2024
89c0008
Approximately equal check only enabled in remote raw data export path
amolnayak311 Jan 4, 2024
c526c72
PR Comments
amolnayak311 Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ trait DefaultPlanner {
case _ => true
})
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs, rawSource = rawSource)))
window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil,
lp.offsetMs, rawSource = rawSource)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(lp.startMs / 1000, lp.stepMs / 1000,
Expand Down Expand Up @@ -663,7 +664,9 @@ object PlannerUtil extends StrictLogging {
rhs = rewritePlanWithRemoteRawExport(lp.rhs, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ScalarVectorBinaryOperation =>
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
lp.copy(vector = rewritePlanWithRemoteRawExport(lp.vector, rangeSelector, additionalLookback)
lp.copy(scalarArg = rewritePlanWithRemoteRawExport(lp.scalarArg, rangeSelector, additionalLookback)
.asInstanceOf[ScalarPlan],
vector = rewritePlanWithRemoteRawExport(lp.vector, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ApplyMiscellaneousFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
(rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L))

val partition = getPartitions(lp, params)
assert(partition.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
// For each partition, do a raw data export range query
val execPlans = partition.map(pa => {
val (thisPartitionStartMs, thisPartitionEndMs) =
Expand All @@ -142,7 +143,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(lp))
StitchRvsExec(qContext.copy(origQueryParams = newPromQlParams)
, inProcessPlanDispatcher, None,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]))
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]),
enableApproximatelyEqualCheck = true)
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved
}
)
)
Expand Down Expand Up @@ -572,9 +574,10 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
if (queryConfig.supportRemoteRawExport) {
logger.warn(
s"Remote raw export is supported and the $totalExpectedRawExport ms" +
s" is greater than the max allowed raw export duration of ${queryConfig.maxRemoteRawExportTimeRange}")
s" is greater than the max allowed raw export duration of ${queryConfig.maxRemoteRawExportTimeRange}" +
s" for promQl=${qParams.promQl}")
} else {
logger.warn("Remote raw export not enabled")
logger.warn(s"Remote raw export not enabled for promQl=${qParams.promQl}")
}
Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher))
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -586,13 +589,13 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
case (Some((_, prevTimeRange)), ep: ListBuffer[ExecPlan]) =>
val (currentAssignment, currentTimeRange) = next
// Start and end is the next and previous second of the previous and current time range respectively
val (startTime, endTime) = (prevTimeRange.endMs / 1000L * 1000L,
val (gapStartTime, gapEndTime) = (prevTimeRange.endMs / 1000L * 1000L,
(currentTimeRange.startMs / 1000L * 1000L) - 1000L)


// If we enable stitching the missing part of time range between the previous time range's end time and
// current time range's start time, we will perform remote/local partition raw data export
if (queryConfig.supportRemoteRawExport && startTime < endTime) {
if (queryConfig.supportRemoteRawExport && gapStartTime < gapEndTime) {
// We need to perform raw data export from two partitions, for simplicity we will assume the time range
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved
// spans 2 partition, one partition is on the left and one on the right
// Walk the plan to make all RawSeries support remote export fetching the data from previous partition
Expand Down Expand Up @@ -626,21 +629,21 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs
val totalExpectedRawExport = (gapEndTime - gapStartTime) + lookbackMs + offsetMs
if (queryConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
// Only if the raw export is completely within the previous partition's timerange
val newParams = qParams.copy(startSecs = startTime / 1000, endSecs = endTime / 1000)
val newParams = qParams.copy(startSecs = gapStartTime / 1000, endSecs = gapEndTime / 1000)
val newContext = qContext.copy(origQueryParams = newParams)
val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(startTime, endTime))
val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(gapStartTime, gapEndTime))
ep ++= walkLogicalPlanTree(newLp, newContext, true).plans
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.warn(
s"Remote raw export is supported but the expected raw export for $totalExpectedRawExport ms" +
s" is greater than the max allowed raw export duration ${queryConfig.maxRemoteRawExportTimeRange}")
}
}
val newParams = qParams.copy(startSecs = currentTimeRange.startMs / 1000,
endSecs = qParams.endSecs.min((currentTimeRange.endMs / 1000)))
val newParams = qParams.copy(startSecs = qParams.startSecs.max(currentTimeRange.startMs / 1000),
endSecs = qParams.endSecs.min(currentTimeRange.endMs / 1000))
val newContext = qContext.copy(origQueryParams = newParams)
ep += materializeForPartition(logicalPlan, currentAssignment, newContext)
(Some(next), ep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2427,29 +2427,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
// -E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,123,45,3306,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
// -E~PromQlRemoteExec(PromQlQueryParams(sgn(test{job="app"}) + 123,3633,45,6789,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,Some(10000),None,true,false,true))
val root = execPlan.asInstanceOf[StitchRvsExec]
println("==" * 100)
println(test.query)
println("--" * 100)
println(root.printTree())
println("==" * 100)
// Make sure one PromQlRemoteExec for each partition.
// root.children.size shouldEqual 2
// // Extract the endpoint/TimeStepParams and make sure they are as-expected.
// val expectedQueryParams = {
// val timeStepParams = test.getExpectedRangesSec().map { case (startSecExp, endSecExp) =>
// TimeStepParams(startSecExp, stepSec, endSecExp)
// }
// expectedUrls.zip(timeStepParams)
// }.toSet
// root.children.map{ child =>
// val remote = child.asInstanceOf[PromQlRemoteExec]
// val params = remote.promQlQueryParams
// // Each plan should dispatch the same query.
// params.promQl shouldEqual test.query
// (remote.queryEndpoint, TimeStepParams(params.startSecs, params.stepSecs, params.endSecs))
// }.toSet shouldEqual expectedQueryParams
// // sanity check
// validatePlan(root, test.expected)
validatePlan(root, test.expected)
}
}

Expand Down
Loading
Loading