Skip to content

Commit

Permalink
TEMP
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Sep 16, 2023
1 parent a23ae67 commit 11d683e
Show file tree
Hide file tree
Showing 14 changed files with 493 additions and 632 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, QueryUtils, RangeParams}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand Down Expand Up @@ -414,6 +413,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
Expand All @@ -422,22 +422,16 @@ object LogicalPlanUtils extends StrictLogging {
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val rawShardKeyFilters = getShardKeyFilters(rs)
val shardKeyFilters = rawShardKeyFilters.flatMap{ filters =>
val resolvedFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
// Take care of pipe-joined values here -- create one Equals filter per value.
case EqualsRegex(values: String) if QueryUtils.isPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}}
QueryUtils.combinations(resolvedFilters)
}
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
val shardKeyFilters = getShardKeyFilters(rs)
.flatMap { filters =>
val hasNonEqualShardKeyFilter = filters.exists(!_.filter.isInstanceOf[Equals])
if (hasNonEqualShardKeyFilter) shardKeyMatcher(filters) else Seq(filters)
}
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
}
}
if (rsTschemaOpts.isEmpty) {
return None
}
Expand All @@ -457,6 +451,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def canPushdown(plan: CandidatePushdownPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Boolean = {
val hasPushdownableClause = plan match {
Expand All @@ -482,7 +477,7 @@ object LogicalPlanUtils extends StrictLogging {
// return true
// }

val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, getShardKeyFilters)
val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, shardKeyMatcher, getShardKeyFilters)
if (tschema.isEmpty) {
return false
}
Expand All @@ -504,6 +499,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
def getPushdownKeys[T](lp: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getRawPushdownKeys: RawSeries => Set[T],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Option[Set[T]] = {
Expand Down Expand Up @@ -535,12 +531,13 @@ object LogicalPlanUtils extends StrictLogging {
lhsKeys.isDefined && rhsKeys.isDefined &&
// either the lhs/rhs keys are equal, or at least one of lhs/rhs includes only scalars.
(lhsKeys.get.isEmpty || rhsKeys.get.isEmpty || lhsKeys == rhsKeys) &&
canPushdown(bj, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
canPushdown(bj, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
// union lhs/rhs keys, since one might be empty (if it's a scalar)
if (canPushdownBj) Some(lhsKeys.get.union(rhsKeys.get)) else None
case agg: Aggregate =>
val keys = helper(agg.vectors)
val canPushdownAgg = canPushdown(agg, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
val canPushdownAgg =
canPushdown(agg, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
if (keys.isDefined && canPushdownAgg) keys else None
case nl: NonLeafLogicalPlan =>
// return the set of all child keys iff all child plans can be pushdown-optimized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package filodb.coordinator.queryplanner

import java.util.concurrent.ConcurrentHashMap

import scala.collection.concurrent.{Map => ConcurrentMap}
import scala.jdk.CollectionConverters._

import com.typesafe.scalalogging.StrictLogging
import io.grpc.ManagedChannel

import filodb.coordinator.queryplanner.LogicalPlanUtils._
import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query.{ColumnFilter, PromQlQueryParams, QueryConfig, QueryContext, RvRange}
import filodb.core.query.{ColumnFilter, PromQlQueryParams, QueryConfig, QueryContext, RangeParams, RvRange}
import filodb.grpc.GrpcCommonUtils
import filodb.query._
import filodb.query.LogicalPlan._
Expand Down Expand Up @@ -48,10 +45,11 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionName: String,
val dataset: Dataset,
val queryConfig: QueryConfig,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]] = PartitionLocationPlanner.equalsOnlyShardKeyMatcher,
remoteExecHttpClient: RemoteExecHttpClient = RemoteHttpClient.defaultClient,
channels: ConcurrentMap[String, ManagedChannel] =
new ConcurrentHashMap[String, ManagedChannel]().asScala)
extends PartitionLocationPlanner(dataset, partitionLocationProvider) with StrictLogging {
extends PartitionLocationPlanner(dataset, partitionLocationProvider, shardKeyMatcher) with StrictLogging {

override val schemas: Schemas = Schemas(dataset.schema)
override val dsOptions: DatasetOptions = schemas.part.options
Expand Down Expand Up @@ -95,7 +93,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
} else logicalPlan match {
case mqp: MetadataQueryPlan => materializeMetadataQueryPlan(mqp, qContext).plans.head
case lp: TsCardinalities => materializeTsCardinalities(lp, qContext).plans.head
case _ => walkLogicalPlanTree(logicalPlan, qContext).plans.head
case _ =>
val result = walkLogicalPlanTree(logicalPlan, qContext)
if (result.plans.size > 1) {
val dispatcher = PlannerUtil.pickDispatcher(result.plans)
MultiPartitionDistConcatExec(qContext, dispatcher, result.plans)
} else result.plans.head
}
}

Expand Down Expand Up @@ -126,18 +129,19 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val execPlan = if (partitionName.equals(localPartitionName)) {
localPartitionPlanner.materialize(logicalPlan, qContext)
} else {
val promQl = LogicalPlanParser.convertToQuery(logicalPlan)
val remoteContext = logicalPlan match {
case tls: TopLevelSubquery =>
val instantTime = qContext.origQueryParams.asInstanceOf[PromQlQueryParams].startSecs
val stepSecs = tls.stepMs / 1000
generateRemoteExecParamsWithStep(qContext, instantTime, stepSecs, instantTime)
generateRemoteExecParamsWithStep(qContext, promQl, instantTime, stepSecs, instantTime)
case psp: PeriodicSeriesPlan =>
val startSecs = psp.startMs / 1000
val stepSecs = psp.stepMs / 1000
val endSecs = psp.endMs / 1000
generateRemoteExecParamsWithStep(qContext, startSecs, stepSecs, endSecs)
generateRemoteExecParamsWithStep(qContext, promQl, startSecs, stepSecs, endSecs)
case _ =>
generateRemoteExecParams(qContext, startMs, endMs)
generateRemoteExecParams(qContext, promQl, startMs, endMs)
}
// Single partition but remote, send the entire plan remotely
if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") ||
Expand Down Expand Up @@ -198,24 +202,33 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// scalastyle:on cyclomatic.complexity

private def getRoutingKeys(logicalPlan: LogicalPlan) = {
val columnFilterGroup = LogicalPlan.getColumnFilterGroup(logicalPlan)
val columnFilterGroups = LogicalPlan.getColumnFilterGroup(logicalPlan).flatMap { filters =>
// Check if we need to use the shardKeyMatcher.
val hasNonEqualsShardKeyFilter = filters.exists { filter =>
dataset.options.nonMetricShardColumns.contains(filter.column) &&
!filter.filter.isInstanceOf[filodb.core.query.Filter.Equals]
}
if (hasNonEqualsShardKeyFilter) shardKeyMatcher(filters.toSeq) else Seq(filters.toSeq)
}
val routingKeys = dataset.options.nonMetricShardColumns
.map(x => (x, LogicalPlan.getColumnValues(columnFilterGroup, x)))
.map(x => (x, LogicalPlan.getColumnValues(columnFilterGroups.map(_.toSet), x)))
if (routingKeys.flatMap(_._2).isEmpty) Seq.empty else routingKeys.filter(x => x._2.nonEmpty)
}

private def generateRemoteExecParams(queryContext: QueryContext, startMs: Long, endMs: Long) = {
private def generateRemoteExecParams(queryContext: QueryContext, promQl: String, startMs: Long, endMs: Long) = {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
queryContext.copy(origQueryParams = queryParams.copy(startSecs = startMs/1000, endSecs = endMs / 1000),
queryContext.copy(
origQueryParams = queryParams.copy(promQl = promQl, startSecs = startMs/1000, endSecs = endMs / 1000),
plannerParams = queryContext.plannerParams.copy(processMultiPartition = false))
}

private def generateRemoteExecParamsWithStep(
queryContext: QueryContext, startSecs: Long, stepSecs: Long, endSecs: Long
queryContext: QueryContext, promQl: String, startSecs: Long, stepSecs: Long, endSecs: Long
) = {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
queryContext.copy(
origQueryParams = queryParams.copy(startSecs = startSecs, stepSecs = stepSecs, endSecs = endSecs),
origQueryParams =
queryParams.copy(promQl = promQl, startSecs = startSecs, stepSecs = stepSecs, endSecs = endSecs),
plannerParams = queryContext.plannerParams.copy(processMultiPartition = false)
)
}
Expand All @@ -241,12 +254,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val queryTimeRange = TimeRange(periodicSeriesTimeWithOffset.startMs - lookBackMs,
periodicSeriesTimeWithOffset.endMs)

val partitions = if (routingKeys.isEmpty) List.empty
else {
val routingKeyMap = routingKeys.map(x => (x._1, x._2.head)).toMap
partitionLocationProvider.getPartitions(routingKeyMap, queryTimeRange).
sortBy(_.timeRange.startMs)
}
val keys = routingKeys.map(_._1)
val values = routingKeys.map(_._2.toSeq)
val partitions = QueryUtils.combinations(values)
.map(valueCombo => keys.zip(valueCombo))
.flatMap(shardKey => partitionLocationProvider.getPartitions(shardKey.toMap, queryTimeRange))
.distinct
.sortBy(_.timeRange.startMs)
if (partitions.isEmpty && routingKeys.nonEmpty)
logger.warn(s"No partitions found for routing keys: $routingKeys")

Expand Down Expand Up @@ -291,8 +305,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
if (execPlans.size == 1) execPlans.head
else {
// TODO: Do we pass in QueryContext in LogicalPlan's helper rvRangeForPlan?
StitchRvsExec(qContext, inProcessPlanDispatcher, rvRangeFromPlan(logicalPlan),
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]))
MultiPartitionDistConcatExec(
qContext, inProcessPlanDispatcher, execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]))
}
// ^^ Stitch RemoteExec plan results with local using InProcessPlanDispatcher
// Sort to move RemoteExec in end as it does not have schema
Expand All @@ -314,10 +328,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs))
val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint)
if (partitionName.equals(localPartitionName)) {
val lpWithUpdatedTime = copyLogicalPlanWithUpdatedTimeRange(logicalPlan, timeRange)
// FIXME: subquery tests fail when their time-ranges are updated
val lpWithUpdatedTime = if (timeRangeOverride.isDefined) {
copyLogicalPlanWithUpdatedTimeRange(logicalPlan, timeRange)
} else logicalPlan
localPartitionPlanner.materialize(lpWithUpdatedTime, queryContext)
} else {
val ctx = generateRemoteExecParams(queryContext, timeRange.startMs, timeRange.endMs)
val promQL = LogicalPlanParser.convertToQuery(logicalPlan)
val ctx = generateRemoteExecParams(queryContext, promQL, timeRange.startMs, timeRange.endMs)
if (grpcEndpoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
Expand Down Expand Up @@ -374,19 +392,64 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}
}

private def canSupportMultiPartitionCalls(execPlans: Seq[ExecPlan]): Boolean =
execPlans.forall {
case _: PromQlRemoteExec => false
case _ => true
}

private def materializeAggregate(aggregate: Aggregate, queryContext: QueryContext): PlanResult = {
val plan = if (LogicalPlanUtils.hasDescendantAggregateOrJoin(aggregate.vectors)) {
val childPlan = materialize(aggregate.vectors, queryContext)
addAggregator(aggregate, queryContext, PlanResult(Seq(childPlan)))
} else {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val (partitions, _, _, _) = resolvePartitionsAndRoutingKeys(aggregate, queryParams)
val childQueryContext = queryContext.copy(
plannerParams = queryContext.plannerParams.copy(skipAggregatePresent = true))
val execPlans = partitions.map(p => materializeForPartition(aggregate, p, childQueryContext))
val exec = if (execPlans.size == 1) execPlans.head
else {
if ((aggregate.operator.equals(AggregationOperator.TopK)
|| aggregate.operator.equals(AggregationOperator.BottomK)
|| aggregate.operator.equals(AggregationOperator.CountValues)
) && !canSupportMultiPartitionCalls(execPlans))
throw new UnsupportedOperationException(s"Shard Key regex not supported for ${aggregate.operator}")
else {
val reducer = MultiPartitionReduceAggregateExec(queryContext, inProcessPlanDispatcher,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]), aggregate.operator, aggregate.params)
reducer.addRangeVectorTransformer(AggregatePresenter(aggregate.operator, aggregate.params,
RangeParams(queryParams.startSecs, queryParams.stepSecs, queryParams.endSecs)))
reducer
}
}
exec
}
PlanResult(Seq(plan))
}

/**
* Materialize any plan whose materialization strategy is governed by whether-or-not it
* contains leaves that individually span partitions.
*/
private def materializePlanHandleSplitLeaf(logicalPlan: LogicalPlan,
qContext: QueryContext): PlanResult = {
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val hasMultiPartitionLeaves = LogicalPlan.findLeafLogicalPlans(logicalPlan)
.exists(getPartitions(_, qParams).size > 1)
// Create one plan per RawSeries/shard-key pair, then resolve its partitions.
// If any resides on more than one partition, the leaf is "split".
val hasMultiPartitionLeaves =
LogicalPlan.findLeafLogicalPlans(logicalPlan)
.filter(_.isInstanceOf[RawSeries])
.flatMap { rs =>
val rawFilters = LogicalPlan.getNonMetricShardKeyFilters(rs, dataset.options.nonMetricShardColumns)
val filters = rawFilters.flatMap(shardKeyMatcher(_))
filters.map(rs.replaceFilters)
}
.exists(getPartitions(_, qParams).size > 1)
if (hasMultiPartitionLeaves) {
materializeSplitLeafPlan(logicalPlan, qContext)
} else { logicalPlan match {
case agg: Aggregate => super.materializeAggregate(qContext, agg)
case agg: Aggregate => materializeAggregate(agg, qContext)
case psw: PeriodicSeriesWithWindowing => materializePeriodicAndRawSeries(psw, qContext)
case sqw: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, sqw)
case bj: BinaryJoin => materializeMultiPartitionBinaryJoinNoSplitLeaf(bj, qContext)
Expand Down Expand Up @@ -588,7 +651,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment,
urlParams: Map[String, String]) = {
val finalQueryContext = generateRemoteExecParams(
qContext, partitionAssignment.timeRange.startMs, partitionAssignment.timeRange.endMs)
qContext, "", partitionAssignment.timeRange.startMs, partitionAssignment.timeRange.endMs)
val httpEndpoint = partitionAssignment.httpEndPoint +
finalQueryContext.origQueryParams.asInstanceOf[PromQlQueryParams].remoteQueryPath.getOrElse("")
MetadataRemoteExec(httpEndpoint, remoteHttpTimeoutMs,
Expand Down
Loading

0 comments on commit 11d683e

Please sign in to comment.