Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
amolnayak311 committed Dec 14, 2023
1 parent f6ac82f commit 3f2f281
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider

val datasetMetricColumn: String = dataset.options.metricColumn


override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
// Pseudo code for the materialize
//
Expand Down Expand Up @@ -234,7 +233,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
_: PeriodicSeries => materializePlanHandleSplitLeaf(logicalPlan, qContext)
case raw: RawSeries =>
val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
if(getPartitions(raw, params).tail.nonEmpty)
if(getPartitions(raw, params).tail.nonEmpty
&& queryConfig.supportRemoteRawExport)
this.walkLogicalPlanTree(
raw.copy(supportsRemoteDataCall = true),
qContext,
Expand Down Expand Up @@ -455,10 +455,11 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
// First partition doesn't need its start snapped to a periodic step, so deal with it separately.
val headRange = {
val partRange = assignments.head.timeRange
if (range.startMs <= partRange.startMs)
if (range.startMs <= partRange.endMs) {
// At least, some part of the query ends up in this first partition
Some(TimeRange(math.max(range.startMs, partRange.startMs),
math.min(partRange.endMs + offsetMs, range.endMs)))
else
} else
None
}
// Snap remaining range starts to a step (if a step is provided).
Expand Down Expand Up @@ -558,6 +559,9 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
val execPlans = if (assignmentRanges.isEmpty) {
// Assignment ranges empty means we cant run this query fully on one partition and needs
// remote raw export Check if the total time of raw export is within the limits, if not return Empty result
// While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs
if (queryConfig.supportRemoteRawExport &&
Expand All @@ -570,7 +574,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
s"Remote raw export is supported and the $totalExpectedRawExport ms" +
s" is greater than the max allowed raw export duration of ${queryConfig.maxRemoteRawExportTimeRange}")
} else {
logger.warn("Remote raw export not supported")
logger.warn("Remote raw export not enabled")
}
Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher))
}
Expand Down Expand Up @@ -701,8 +705,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider

def materializeMetadataQueryPlan(lp: MetadataQueryPlan, qContext: QueryContext): PlanResult = {
val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]


// LabelCardinality is a special case, here the partitions to send this query to is not the authorized partition
// but the actual one where data resides, similar to how non metadata plans work, however, getting label cardinality
// is a metadata operation and shares common components with other metadata endpoints.
Expand Down
Loading

0 comments on commit 3f2f281

Please sign in to comment.