Skip to content

Commit

Permalink
bug fix: split-key stitching
Browse files Browse the repository at this point in the history
qContext was updated before materalizeForPartition, but new logic requires
LogicalPlan time params to be updated as well.
  • Loading branch information
alextheimer committed Feb 2, 2024
1 parent 31a24b1 commit 70f4fec
Showing 1 changed file with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
partition: PartitionAssignment,
queryContext: QueryContext,
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val qContextWithOverride = timeRangeOverride.map{ r =>
val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000)
queryContext.copy(origQueryParams = newParams)
}.getOrElse(queryContext)
val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams]
val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs))
val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint)
if (partitionName.equals(localPartitionName)) {
Expand All @@ -404,9 +409,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val lpWithUpdatedTime = if (timeRangeOverride.isDefined) {
copyLogicalPlanWithUpdatedTimeRange(logicalPlan, timeRange)
} else logicalPlan
localPartitionPlanner.materialize(lpWithUpdatedTime, queryContext)
localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride)
} else {
val ctx = generateRemoteExecParams(queryContext, timeRange.startMs, timeRange.endMs)
val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs)
if (grpcEndpoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
Expand Down Expand Up @@ -681,17 +686,20 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
s"${queryConfig.routingConfig.maxRemoteRawExportTimeRange}")
}
}
val newParams = qParams.copy(startSecs = qParams.startSecs.max(currentTimeRange.startMs / 1000),
endSecs = qParams.endSecs.min(currentTimeRange.endMs / 1000))
val newContext = qContext.copy(origQueryParams = newParams)
ep += materializeForPartition(logicalPlan, currentAssignment, newContext)
val timeRangeOverride = TimeRange(
1000 * qParams.startSecs.max(currentTimeRange.startMs / 1000),
1000 * qParams.endSecs.min(currentTimeRange.endMs / 1000)
)
ep += materializeForPartition(logicalPlan, currentAssignment, qContext, Some(timeRangeOverride))
(Some(next), ep)
//
case (None, ep: ListBuffer[ExecPlan]) =>
val (assignment, range) = next
val newParams = qParams.copy(startSecs = qParams.startSecs, endSecs = range.endMs / 1000)
val newContext = qContext.copy(origQueryParams = newParams)
ep += materializeForPartition(logicalPlan, assignment, newContext)
val timeRangeOverride = TimeRange(
1000 * qParams.startSecs,
range.endMs
)
ep += materializeForPartition(logicalPlan, assignment, qContext, Some(timeRangeOverride))
(Some(next), ep)
}
}
Expand Down

0 comments on commit 70f4fec

Please sign in to comment.