Skip to content

Commit

Permalink
Support Partition Assignment V2.
Browse files Browse the repository at this point in the history
Create a trait class PartitionAssignmentTrait to support both V1 and V2 assignemnt.
Query service has to re-compile but does not have to change anything.
For single partition assignment queries, the behavior does not change.
For multiple partition assignment queries without partition assignment change, use MultiPartitionDistConcatExec to assemble all time series.
For multiple partition assignment queries with partition assignment, stitch the time series  before and after changes.
  • Loading branch information
Yu Zhang committed Dec 19, 2024
1 parent 8a15d74 commit 8fb4fa3
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,34 @@ import filodb.query.LogicalPlan._
import filodb.query.exec._

//scalastyle:off file.size.limit
case class PartitionDetails(partitionName: String, httpEndPoint: String,
grpcEndPoint: Option[String], proportion: Float)
trait PartitionAssignmentTrait {
val proportionMap: Map[String, PartitionDetails]
val timeRange: TimeRange
}

case class PartitionAssignment(partitionName: String, httpEndPoint: String, timeRange: TimeRange,
grpcEndPoint: Option[String] = None)
grpcEndPoint: Option[String] = None) extends PartitionAssignmentTrait {
val proportionMap: Map[String, PartitionDetails] =
Map(partitionName -> PartitionDetails(partitionName, httpEndPoint, grpcEndPoint, 1.0f))
}
case class PartitionAssignmentV2(proportionMap: Map[String, PartitionDetails],
timeRange: TimeRange) extends PartitionAssignmentTrait

trait PartitionLocationProvider {

// keep this function for backward compatibility.
def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment]
def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment]
def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = {
getPartitions(routingKey, timeRange)
}
def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignmentTrait] = {
getMetadataPartitions(nonMetricShardKeyFilters, timeRange)
}
}

/**
Expand Down Expand Up @@ -166,9 +185,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val partitions = getPartitions(logicalPlan, paramToCheckPartitions)
if (isSinglePartition(partitions)) {
val (partitionName, startMs, endMs, grpcEndpoint) = partitions.headOption match {
case Some(pa: PartitionAssignment)
=> (pa.partitionName, params.startSecs * 1000L,
params.endSecs * 1000L, pa.grpcEndPoint)
case Some(pa: PartitionAssignmentTrait)
=> (pa.proportionMap.keys.head, params.startSecs * 1000L,
params.endSecs * 1000L, pa.proportionMap.values.head.grpcEndPoint)
case None => (localPartitionName, params.startSecs * 1000L, params.endSecs * 1000L, None)
}

Expand Down Expand Up @@ -198,7 +217,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
dataset.ref, plannerSelector)
} else {
val remotePartitionEndpoint = partitions.head.httpEndPoint
val remotePartitionEndpoint = partitions.head.proportionMap.values.head.httpEndPoint
val httpEndpoint = remotePartitionEndpoint + params.remoteQueryPath.getOrElse("")
PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
dataset.ref, remoteExecHttpClient)
Expand Down Expand Up @@ -330,7 +349,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val partitions = if (routingKeys.isEmpty) List.empty
else {
routingKeys.flatMap{ keys =>
partitionLocationProvider.getPartitions(keys, queryTimeRange).
partitionLocationProvider.getPartitionsTrait(keys, queryTimeRange).
sortBy(_.timeRange.startMs)
}.toList
}
Expand Down Expand Up @@ -388,24 +407,46 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan:: Nil)
}

private def materializeForPartition(logicalPlan: LogicalPlan,
partition: PartitionAssignmentTrait,
queryContext: QueryContext,
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
partition match {
case PartitionAssignment(partitionName, httpEndPoint, _, grpcEndPoint) =>
materializeForPartition(logicalPlan, partitionName, grpcEndPoint, httpEndPoint, queryContext, timeRangeOverride)
case PartitionAssignmentV2(proportionMap, _) =>
val plans = proportionMap.map(entry => {
val partitionDetails = entry._2
materializeForPartition(logicalPlan, partitionDetails.partitionName,
partitionDetails.grpcEndPoint, partitionDetails.httpEndPoint, queryContext, timeRangeOverride)
}).toSeq
if (plans.size > 1) {
val dispatcher = PlannerUtil.pickDispatcher(plans)
MultiPartitionDistConcatExec(queryContext, dispatcher, plans)
} else {
plans.head
}
}
}
/**
* If the argument partition is local, materialize the LogicalPlan with the local planner.
* Otherwise, create a PromQlRemoteExec.
* @param timeRangeOverride: if given, the plan will be materialized to this range. Otherwise, the
* range is computed from the PromQlQueryParams.
*/
private def materializeForPartition(logicalPlan: LogicalPlan,
partition: PartitionAssignment,
partitionName: String,
grpcEndpoint: Option[String],
httpEndPoint: String,
queryContext: QueryContext,
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
timeRangeOverride: Option[TimeRange]): ExecPlan = {
val qContextWithOverride = timeRangeOverride.map{ r =>
val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000)
queryContext.copy(origQueryParams = newParams)
}.getOrElse(queryContext)
val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams]
val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs))
val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint)
if (partitionName.equals(localPartitionName)) {
// FIXME: the below check is needed because subquery tests fail when their
// time-ranges are updated even with the original query params.
Expand All @@ -423,7 +464,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher,
dataset.ref, plannerSelector)
} else {
val httpEndpoint = partition.httpEndPoint + queryParams.remoteQueryPath.getOrElse("")
val httpEndpoint = httpEndPoint + queryParams.remoteQueryPath.getOrElse("")
PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs,
ctx, inProcessPlanDispatcher, dataset.ref, remoteExecHttpClient)
}
Expand All @@ -442,9 +483,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* @param stepMsOpt occupied iff the returned ranges should describe periodic steps
* (i.e. all range start times (except the first) should be snapped to a step)
*/
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignment], queryRange: TimeRange,
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignmentTrait], queryRange: TimeRange,
lookbackMs: Long = 0L, offsetMs: Seq[Long] = Seq(0L),
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignment, TimeRange)] = {
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignmentTrait, TimeRange)] = {
// Construct a sequence of Option[TimeRange]; the ith range is None iff the ith partition has no range to query.
// First partition doesn't need its start snapped to a periodic step, so deal with it separately.
val filteredAssignments = assignments
Expand Down Expand Up @@ -612,7 +653,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
} else {
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) {
(None: Option[(PartitionAssignmentTrait, TimeRange)], ListBuffer.empty[ExecPlan])) {
case (acc, next) => acc match {
case (Some((_, prevTimeRange)), ep: ListBuffer[ExecPlan]) =>
val (currentAssignment, currentTimeRange) = next
Expand Down Expand Up @@ -803,7 +844,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionPlanner.materialize(lp, qContext)
}
else {
val execPlans = partitions.map { p =>
val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd =>
PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p =>
logger.debug(s"partitionInfo=$p; queryParams=$queryParams")
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(
Expand Down Expand Up @@ -848,7 +890,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
logger.warn(s"no partitions found for $lp; defaulting to local planner")
localPartitionPlanner.materialize(lp, qContext)
} else {
val execPlans = partitions.map { p =>
val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd =>
PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p =>
logger.debug(s"partition=$p; plan=$lp")
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
Expand All @@ -867,9 +910,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan::Nil)
}

private def getMetadataPartitions(filters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = {
private def getMetadataPartitions(filters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignmentTrait] = {
val nonMetricShardKeyFilters = filters.filter(f => dataset.options.nonMetricShardColumns.contains(f.column))
partitionLocationProvider.getMetadataPartitions(nonMetricShardKeyFilters, timeRange)
partitionLocationProvider.getMetadataPartitionsTrait(nonMetricShardKeyFilters, timeRange)
}

private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class PartitionLocationPlanner(dataset: Dataset,
*/
protected def getPartitions(logicalPlan: LogicalPlan,
queryParams: PromQlQueryParams,
infiniteTimeRange: Boolean = false) : Seq[PartitionAssignment] = {
infiniteTimeRange: Boolean = false) : Seq[PartitionAssignmentTrait] = {

//1. Get a Seq of all Leaf node filters
val leafFilters = LogicalPlan.getColumnFilterGroup(logicalPlan)
Expand Down Expand Up @@ -71,19 +71,19 @@ abstract class PartitionLocationPlanner(dataset: Dataset,

//4. Based on the map in 2 and time range in 5, get the partitions to query
routingKeyMap.flatMap(metricMap =>
partitionLocationProvider.getPartitions(metricMap, queryTimeRange))
partitionLocationProvider.getPartitionsTrait(metricMap, queryTimeRange))
}
// scalastyle:on method.length

/**
* Checks if all the PartitionAssignments belong to same partition
*/
protected def isSinglePartition(partitions: Seq[PartitionAssignment]) : Boolean = {
protected def isSinglePartition(partitions: Seq[PartitionAssignmentTrait]) : Boolean = {
if (partitions.isEmpty)
true
else {
val partName = partitions.head.partitionName
partitions.forall(_.partitionName.equals(partName))
val pSet = partitions.flatMap(p => p.proportionMap.keys)
pSet.forall(p => p.equals(pSet.head))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
val shardKeys = getShardKeys(logicalPlan)
val partitions = shardKeys
.flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams))
.map(_.partitionName)
.flatMap(_.proportionMap.keys)
.distinct
// NOTE: don't use partitions.size < 2. When partitions == 0, generateExec will not
// materialize any plans because there are no partitions against which it should materialize.
Expand Down Expand Up @@ -240,7 +240,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
val newLogicalPlan = logicalPlan.replaceFilters(key)
val newQueryParams = queryParams.copy(promQl = LogicalPlanParser.convertToQuery(newLogicalPlan))
val partitions = getPartitions(newLogicalPlan, newQueryParams)
.map(_.partitionName)
.flatMap(_.proportionMap.keys)
.distinct
if (partitions.size > 1) {
partitionSplitKeys.append(key)
Expand Down
Loading

0 comments on commit 8fb4fa3

Please sign in to comment.