Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: fix(query): fixed split range query with binary op (#1914) #1918

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* @param assignments must be sorted and time-disjoint
* @param queryRange the complete time-range. Does not include the offset.
* @param lookbackMs a query's maximum lookback. The time to skip immediately after a partition split.
* @param offsetMs a query's maximum offset. Offsets the "skipped" ranges (forward in time) after partition splits.
* @param offsetMs a query's offsets. Offsets the "skipped" ranges (forward in time) after partition splits.
* @param stepMsOpt occupied iff the returned ranges should describe periodic steps
* (i.e. all range start times (except the first) should be snapped to a step)
*/
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignment], queryRange: TimeRange,
lookbackMs: Long = 0L, offsetMs: Long = 0L,
lookbackMs: Long = 0L, offsetMs: Seq[Long] = Seq(0L),
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignment, TimeRange)] = {
// Construct a sequence of Option[TimeRange]; the ith range is None iff the ith partition has no range to query.
// First partition doesn't need its start snapped to a periodic step, so deal with it separately.
Expand All @@ -458,7 +458,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
if (queryRange.startMs <= partRange.endMs) {
// At least, some part of the query ends up in this first partition
Some(TimeRange(math.max(queryRange.startMs, partRange.startMs),
math.min(partRange.endMs + offsetMs, queryRange.endMs)))
math.min(partRange.endMs + offsetMs.min, queryRange.endMs)))
} else
None
}
Expand All @@ -471,13 +471,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
0
val tailRanges = filteredAssignments.tail.map { assign =>
val startMs = if (stepMsOpt.nonEmpty) {
snapToStep(timestamp = assign.timeRange.startMs + lookbackMs + offsetMs + periodOfUncertaintyMs,
snapToStep(timestamp = assign.timeRange.startMs + lookbackMs + offsetMs.max + periodOfUncertaintyMs,
step = stepMsOpt.get,
origin = queryRange.startMs)
} else {
assign.timeRange.startMs + lookbackMs + offsetMs
assign.timeRange.startMs + lookbackMs + offsetMs.max
}
val endMs = math.min(queryRange.endMs, assign.timeRange.endMs + offsetMs)
val endMs = math.min(queryRange.endMs, assign.timeRange.endMs + offsetMs.max)

if (startMs <= endMs) {
Some(TimeRange(startMs, endMs))
Expand Down Expand Up @@ -571,16 +571,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* Materializes a LogicalPlan with leaves that individually span multiple partitions.
* All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they
* span more than one non-metric shard key prefix.
* Split-leaf plans that contain at least one BinaryJoin will additionally fail to materialize
* if any of the plan's BinaryJoins contain an offset.
*/
//scalastyle:off method.length
private def materializeSplitLeafPlan(logicalPlan: LogicalPlan,
qContext: QueryContext): PlanResult = {
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// get a mapping of assignments to time-ranges to query
val lookbackMs = getLookBackMillis(logicalPlan).max
val offsetMs = getOffsetMillis(logicalPlan).max
val offsetMs = getOffsetMillis(logicalPlan)
val timeRange = TimeRange(1000 * qParams.startSecs, 1000 * qParams.endSecs)
val stepMsOpt = if (qParams.startSecs == qParams.endSecs) None else Some(1000 * qParams.stepSecs)
val partitions = getPartitions(logicalPlan, qParams).distinct.sortBy(_.timeRange.startMs)
Expand All @@ -594,7 +592,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max
if (queryConfig.routingConfig.supportRemoteRawExport &&
queryConfig.routingConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(startTime * 1000, endTime * 1000))
Expand Down Expand Up @@ -634,21 +632,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// spans 2 partition, one partition is on the left and one on the right
// Walk the plan to make all RawSeries support remote export fetching the data from previous partition
// When we rewrite the RawSeries's rangeSelector, we will make the start and end time same as the end
// of the
// previous partition's end time and then do a raw query for the duration of the
// of the previous partition's end time and then do a raw query for the duration of the
// (currentTimeRange.startMs - currentAssignment.timeRange.startMs) + offset + lookback.
//
// Partition split end time for queries in partition 1
// V(p) V(t1)
// |----o---------------|-----x------x-----------------------------o-------|
// ^(s) ^(t2) ^(e)
// Query start time Start time in new partition Query end time
//
// Given we have offset of 10 mins, the query range from partition P1 (left of the partition split point)
// is [s, p + 10m]. The offset looks at data 10 mins back, so we can extent the time range in p1 to 10
// mins after the split point p We want to now provide results for time range t1 - t2, which is missing
// Lets assume the query is sum(rate(foo{}[5m] offset 10m))
// Given the offset is 10m, lookback is 5m, we would need raw data in the range
// [t1 - 5m - 10m, t2], this range for raw queries span two partitions and we will let the RawSeries (the
// Given we have two offsets [10 mins, 5 mins],
// the query range from partition P1 (left of the partition split point) is [s, p + 5m].
// The offset looks at data 5 mins back and 10 mins back, so we can extend the time range in p1 to 5
// mins after the split point p. We want to now provide results for time range t1 - t2, which is missing
//
// Lets assume the query is sum(rate(foo{}[5m] offset 5m) + sum(rate(foo{}[10m] offset [10m])
// Given the offset is [5m, 10m], lookback is [5m, 10m], we would need raw data in the range
// [t1 - 10m - 10m, t2], this range for raw queries span two partitions and we will let the RawSeries (the
// leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected
// from partition p1 or p2
//
Expand All @@ -659,12 +659,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work
// seamlessly with no data gaps
// Note that at the moment, while planning, we only can look at whats the max time range we can support.
// We still dont hqve capabilities to check the expected number of timeseries scanned or bytes scanned
// We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
val rawExportStartDurationThisPartition = prevTimeRange.endMs / 1000L * 1000L
val totalExpectedRawExport = (gapEndTimeMs - rawExportStartDurationThisPartition) + lookbackMs + offsetMs
val totalExpectedRawExport = ((gapEndTimeMs - rawExportStartDurationThisPartition)
+ lookbackMs + offsetMs.max)
if (queryConfig.routingConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
// Only if the raw export is completely within the previous partition's timerange
val newParams = qParams.copy(
Expand Down
Loading
Loading