Skip to content

Commit

Permalink
cherry-pick: fix(query): fixed split range query with binary op (#1914)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer authored Dec 19, 2024
2 parents 4298c3a + c4d83d9 commit d33598f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* @param assignments must be sorted and time-disjoint
* @param queryRange the complete time-range. Does not include the offset.
* @param lookbackMs a query's maximum lookback. The time to skip immediately after a partition split.
* @param offsetMs a query's maximum offset. Offsets the "skipped" ranges (forward in time) after partition splits.
* @param offsetMs a query's offsets. Offsets the "skipped" ranges (forward in time) after partition splits.
* @param stepMsOpt occupied iff the returned ranges should describe periodic steps
* (i.e. all range start times (except the first) should be snapped to a step)
*/
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignment], queryRange: TimeRange,
lookbackMs: Long = 0L, offsetMs: Long = 0L,
lookbackMs: Long = 0L, offsetMs: Seq[Long] = Seq(0L),
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignment, TimeRange)] = {
// Construct a sequence of Option[TimeRange]; the ith range is None iff the ith partition has no range to query.
// First partition doesn't need its start snapped to a periodic step, so deal with it separately.
Expand All @@ -458,7 +458,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
if (queryRange.startMs <= partRange.endMs) {
// At least, some part of the query ends up in this first partition
Some(TimeRange(math.max(queryRange.startMs, partRange.startMs),
math.min(partRange.endMs + offsetMs, queryRange.endMs)))
math.min(partRange.endMs + offsetMs.min, queryRange.endMs)))
} else
None
}
Expand All @@ -471,13 +471,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
0
val tailRanges = filteredAssignments.tail.map { assign =>
val startMs = if (stepMsOpt.nonEmpty) {
snapToStep(timestamp = assign.timeRange.startMs + lookbackMs + offsetMs + periodOfUncertaintyMs,
snapToStep(timestamp = assign.timeRange.startMs + lookbackMs + offsetMs.max + periodOfUncertaintyMs,
step = stepMsOpt.get,
origin = queryRange.startMs)
} else {
assign.timeRange.startMs + lookbackMs + offsetMs
assign.timeRange.startMs + lookbackMs + offsetMs.max
}
val endMs = math.min(queryRange.endMs, assign.timeRange.endMs + offsetMs)
val endMs = math.min(queryRange.endMs, assign.timeRange.endMs + offsetMs.max)

if (startMs <= endMs) {
Some(TimeRange(startMs, endMs))
Expand Down Expand Up @@ -571,16 +571,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* Materializes a LogicalPlan with leaves that individually span multiple partitions.
* All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they
* span more than one non-metric shard key prefix.
* Split-leaf plans that contain at least one BinaryJoin will additionally fail to materialize
* if any of the plan's BinaryJoins contain an offset.
*/
//scalastyle:off method.length
private def materializeSplitLeafPlan(logicalPlan: LogicalPlan,
qContext: QueryContext): PlanResult = {
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// get a mapping of assignments to time-ranges to query
val lookbackMs = getLookBackMillis(logicalPlan).max
val offsetMs = getOffsetMillis(logicalPlan).max
val offsetMs = getOffsetMillis(logicalPlan)
val timeRange = TimeRange(1000 * qParams.startSecs, 1000 * qParams.endSecs)
val stepMsOpt = if (qParams.startSecs == qParams.endSecs) None else Some(1000 * qParams.stepSecs)
val partitions = getPartitions(logicalPlan, qParams).distinct.sortBy(_.timeRange.startMs)
Expand All @@ -594,7 +592,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max
if (queryConfig.routingConfig.supportRemoteRawExport &&
queryConfig.routingConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
val newLp = rewritePlanWithRemoteRawExport(logicalPlan, IntervalSelector(startTime * 1000, endTime * 1000))
Expand Down Expand Up @@ -634,21 +632,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// spans 2 partition, one partition is on the left and one on the right
// Walk the plan to make all RawSeries support remote export fetching the data from previous partition
// When we rewrite the RawSeries's rangeSelector, we will make the start and end time same as the end
// of the
// previous partition's end time and then do a raw query for the duration of the
// of the previous partition's end time and then do a raw query for the duration of the
// (currentTimeRange.startMs - currentAssignment.timeRange.startMs) + offset + lookback.
//
// Partition split end time for queries in partition 1
// V(p) V(t1)
// |----o---------------|-----x------x-----------------------------o-------|
// ^(s) ^(t2) ^(e)
// Query start time Start time in new partition Query end time
//
// Given we have offset of 10 mins, the query range from partition P1 (left of the partition split point)
// is [s, p + 10m]. The offset looks at data 10 mins back, so we can extent the time range in p1 to 10
// mins after the split point p We want to now provide results for time range t1 - t2, which is missing
// Lets assume the query is sum(rate(foo{}[5m] offset 10m))
// Given the offset is 10m, lookback is 5m, we would need raw data in the range
// [t1 - 5m - 10m, t2], this range for raw queries span two partitions and we will let the RawSeries (the
// Given we have two offsets [10 mins, 5 mins],
// the query range from partition P1 (left of the partition split point) is [s, p + 5m].
// The offset looks at data 5 mins back and 10 mins back, so we can extend the time range in p1 to 5
// mins after the split point p. We want to now provide results for time range t1 - t2, which is missing
//
// Lets assume the query is sum(rate(foo{}[5m] offset 5m) + sum(rate(foo{}[10m] offset [10m])
// Given the offset is [5m, 10m], lookback is [5m, 10m], we would need raw data in the range
// [t1 - 10m - 10m, t2], this range for raw queries span two partitions and we will let the RawSeries (the
// leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected
// from partition p1 or p2
//
Expand All @@ -659,12 +659,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work
// seamlessly with no data gaps
// Note that at the moment, while planning, we only can look at whats the max time range we can support.
// We still dont hqve capabilities to check the expected number of timeseries scanned or bytes scanned
// We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
val rawExportStartDurationThisPartition = prevTimeRange.endMs / 1000L * 1000L
val totalExpectedRawExport = (gapEndTimeMs - rawExportStartDurationThisPartition) + lookbackMs + offsetMs
val totalExpectedRawExport = ((gapEndTimeMs - rawExportStartDurationThisPartition)
+ lookbackMs + offsetMs.max)
if (queryConfig.routingConfig.maxRemoteRawExportTimeRange.toMillis > totalExpectedRawExport) {
// Only if the raw export is completely within the previous partition's timerange
val newParams = qParams.copy(
Expand Down
Loading

0 comments on commit d33598f

Please sign in to comment.