Skip to content

Commit

Permalink
TEMP
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Sep 17, 2023
1 parent a23ae67 commit adb4ce2
Show file tree
Hide file tree
Showing 15 changed files with 611 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, QueryUtils, RangeParams}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand Down Expand Up @@ -414,6 +413,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
Expand All @@ -422,22 +422,16 @@ object LogicalPlanUtils extends StrictLogging {
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val rawShardKeyFilters = getShardKeyFilters(rs)
val shardKeyFilters = rawShardKeyFilters.flatMap{ filters =>
val resolvedFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
// Take care of pipe-joined values here -- create one Equals filter per value.
case EqualsRegex(values: String) if QueryUtils.isPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}}
QueryUtils.combinations(resolvedFilters)
}
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
val shardKeyFilters = getShardKeyFilters(rs)
.flatMap { filters =>
val hasNonEqualShardKeyFilter = filters.exists(!_.filter.isInstanceOf[Equals])
if (hasNonEqualShardKeyFilter) shardKeyMatcher(filters) else Seq(filters)
}
shardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
}
}
if (rsTschemaOpts.isEmpty) {
return None
}
Expand All @@ -457,6 +451,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
private def canPushdown(plan: CandidatePushdownPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Boolean = {
val hasPushdownableClause = plan match {
Expand All @@ -482,7 +477,7 @@ object LogicalPlanUtils extends StrictLogging {
// return true
// }

val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, getShardKeyFilters)
val tschema = sameRawSeriesTargetSchemaColumns(plan, targetSchemaProvider, shardKeyMatcher, getShardKeyFilters)
if (tschema.isEmpty) {
return false
}
Expand All @@ -504,6 +499,7 @@ object LogicalPlanUtils extends StrictLogging {
*/
def getPushdownKeys[T](lp: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
shardKeyMatcher: Seq[ColumnFilter] => Seq[Seq[ColumnFilter]],
nonMetricShardKeyCols: Seq[String],
getRawPushdownKeys: RawSeries => Set[T],
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]): Option[Set[T]] = {
Expand Down Expand Up @@ -535,12 +531,13 @@ object LogicalPlanUtils extends StrictLogging {
lhsKeys.isDefined && rhsKeys.isDefined &&
// either the lhs/rhs keys are equal, or at least one of lhs/rhs includes only scalars.
(lhsKeys.get.isEmpty || rhsKeys.get.isEmpty || lhsKeys == rhsKeys) &&
canPushdown(bj, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
canPushdown(bj, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
// union lhs/rhs keys, since one might be empty (if it's a scalar)
if (canPushdownBj) Some(lhsKeys.get.union(rhsKeys.get)) else None
case agg: Aggregate =>
val keys = helper(agg.vectors)
val canPushdownAgg = canPushdown(agg, targetSchemaProvider, nonMetricShardKeyCols, getShardKeyFilters)
val canPushdownAgg =
canPushdown(agg, targetSchemaProvider, shardKeyMatcher, nonMetricShardKeyCols, getShardKeyFilters)
if (keys.isDefined && canPushdownAgg) keys else None
case nl: NonLeafLogicalPlan =>
// return the set of all child keys iff all child plans can be pushdown-optimized
Expand Down
Loading

0 comments on commit adb4ce2

Please sign in to comment.