Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge integration part 2 #1915

Merged
merged 6 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,30 +567,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}}
}

/**
* Throws a BadQueryException if any of the following conditions hold:
* (1) the plan spans more than one non-metric shard key prefix.
* (2) the plan contains at least one BinaryJoin, and any of its BinaryJoins contain an offset.
* @param splitLeafPlan must contain leaf plans that individually span multiple partitions.
*/
private def validateSplitLeafPlan(splitLeafPlan: LogicalPlan): Unit = {
val baseErrorMessage = "This query contains selectors that individually read data from multiple partitions. " +
"This is likely because a selector's data was migrated between partitions. "
if (hasBinaryJoin(splitLeafPlan) && getOffsetMillis(splitLeafPlan).exists(_ > 0)) {
throw new BadQueryException( baseErrorMessage +
"These \"split\" queries cannot contain binary joins with offsets."
)
}
lazy val hasMoreThanOneNonMetricShardKey =
LogicalPlanUtils.resolvePipeConcatenatedShardKeyFilters(splitLeafPlan, dataset.options.nonMetricShardColumns)
.filter(_.nonEmpty).distinct.size > 1
if (hasMoreThanOneNonMetricShardKey) {
throw new BadQueryException( baseErrorMessage +
"These \"split\" queries are not supported if they contain multiple non-metric shard keys."
)
}
}

/**
* Materializes a LogicalPlan with leaves that individually span multiple partitions.
* All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they
Expand All @@ -601,9 +577,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
//scalastyle:off method.length
private def materializeSplitLeafPlan(logicalPlan: LogicalPlan,
qContext: QueryContext): PlanResult = {
// TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within
// the limits of max range of data exported
validateSplitLeafPlan(logicalPlan)
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// get a mapping of assignments to time-ranges to query
val lookbackMs = getLookBackMillis(logicalPlan).max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ object ProtoConverters {
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1017,6 +1018,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.BinaryOperator.{ADD, LAND}
import filodb.query.InstantFunctionId.Ln
import filodb.query.{BadQueryException, LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities}
import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities}
import filodb.query.exec._

class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{
Expand Down Expand Up @@ -1712,18 +1712,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
validatePlan(execPlan2, expectedPlanWithRemoteExec1)


val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))"
val lp4 = Parser.queryRangeToLogicalPlan(query4, TimeStepParams(2000, stepSecs, 10000))

val promQlQueryParams4 = PromQlQueryParams(query4, 1000, 100, 10000)
intercept[BadQueryException] {
// Expecting to see Exception when we use BinaryJoin with offsets, technically this too should not be a big deal
// as we need to identify the right window, however this was not supported even before the change and it is ok to
// leave it unaddressed in the first phase as its just Binary joins with offsets
engine.materialize(lp4, QueryContext(origQueryParams = promQlQueryParams4, plannerParams =
PlannerParams(processMultiPartition = true)))
}


// Planner with period of uncertainty should still generate steps that are aligned with start and step,
// that is should be snapped correctly
Expand Down
Loading
Loading