diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index b083960d06..12ee86a813 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -395,7 +395,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv partition: PartitionAssignment, queryContext: QueryContext, timeRangeOverride: Option[TimeRange] = None): ExecPlan = { - val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] + val qContextWithOverride = timeRangeOverride.map{ r => + val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] + val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000) + queryContext.copy(origQueryParams = newParams) + }.getOrElse(queryContext) + val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams] val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs)) val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint) if (partitionName.equals(localPartitionName)) { @@ -404,9 +409,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv val lpWithUpdatedTime = if (timeRangeOverride.isDefined) { copyLogicalPlanWithUpdatedTimeRange(logicalPlan, timeRange) } else logicalPlan - localPartitionPlanner.materialize(lpWithUpdatedTime, queryContext) + localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride) } else { - val ctx = generateRemoteExecParams(queryContext, timeRange.startMs, timeRange.endMs) + val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs) if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") || queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) { @@ -681,17 +686,20 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv s"${queryConfig.routingConfig.maxRemoteRawExportTimeRange}") } } - val newParams = qParams.copy(startSecs = qParams.startSecs.max(currentTimeRange.startMs / 1000), - endSecs = qParams.endSecs.min(currentTimeRange.endMs / 1000)) - val newContext = qContext.copy(origQueryParams = newParams) - ep += materializeForPartition(logicalPlan, currentAssignment, newContext) + val timeRangeOverride = TimeRange( + 1000 * qParams.startSecs.max(currentTimeRange.startMs / 1000), + 1000 * qParams.endSecs.min(currentTimeRange.endMs / 1000) + ) + ep += materializeForPartition(logicalPlan, currentAssignment, qContext, Some(timeRangeOverride)) (Some(next), ep) // case (None, ep: ListBuffer[ExecPlan]) => val (assignment, range) = next - val newParams = qParams.copy(startSecs = qParams.startSecs, endSecs = range.endMs / 1000) - val newContext = qContext.copy(origQueryParams = newParams) - ep += materializeForPartition(logicalPlan, assignment, newContext) + val timeRangeOverride = TimeRange( + 1000 * qParams.startSecs, + range.endMs + ) + ep += materializeForPartition(logicalPlan, assignment, qContext, Some(timeRangeOverride)) (Some(next), ep) } }