Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): reduce regex query fanout #1704

Merged
merged 84 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
cc94eac
perf(query): reduce regex query fanout
alextheimer Aug 13, 2023
225c1a0
fix test
alextheimer Aug 14, 2023
651ba32
account for bug and add feature flag
alextheimer Aug 29, 2023
c7989f1
add default index
alextheimer Aug 29, 2023
93eac50
update tests
alextheimer Aug 29, 2023
85f5f7b
add tests
alextheimer Aug 29, 2023
6bddde9
function name change for clarity
alextheimer Sep 13, 2023
0e6feb8
rename method for accuracy
alextheimer Sep 13, 2023
6561d21
cleanup
alextheimer Sep 13, 2023
ccc4e2c
add splitOnPipes
alextheimer Sep 13, 2023
220d43d
add comment
alextheimer Sep 13, 2023
a23ae67
add regex tests
alextheimer Sep 13, 2023
052ac1b
stashed updates
alextheimer Sep 27, 2023
3dd515e
update routingKeys method
alextheimer Jan 23, 2024
1a767d1
restore old materializeSplitLeafPlan
alextheimer Jan 23, 2024
5b76f2c
cleanup
alextheimer Jan 23, 2024
52f993a
minor changes / cleanup
alextheimer Jan 23, 2024
ec0fa8f
test updates
alextheimer Jan 23, 2024
be6cbc8
Merge branch 'develop' into fanout5
alextheimer Jan 23, 2024
a96c4d4
resolve build errors
alextheimer Jan 23, 2024
f1a8318
udpate test
alextheimer Jan 24, 2024
40b61b3
propose changes
alextheimer Jan 24, 2024
86e9547
readd wlpt prefix
alextheimer Jan 24, 2024
56687c8
rename
alextheimer Jan 24, 2024
503e508
recover/merge old commits
alextheimer Jan 26, 2024
e476181
add grouping and handle split leaves
alextheimer Jan 30, 2024
9ec1290
LogicalPlanUtils comments
alextheimer Jan 31, 2024
77b2033
update MPP
alextheimer Jan 31, 2024
a5f0f12
cleanup PLP
alextheimer Jan 31, 2024
8ae2387
update skrp
alextheimer Jan 31, 2024
93d6166
ckpt
alextheimer Feb 1, 2024
e738dde
ckpt
alextheimer Feb 1, 2024
d4954c3
scp update complete
alextheimer Feb 1, 2024
96c61b5
mild cleanup
alextheimer Feb 1, 2024
dc9aae7
fix javadoc error
alextheimer Feb 1, 2024
4d35d4e
remove unnecessary method
alextheimer Feb 1, 2024
9cefbae
skrp fix
alextheimer Feb 1, 2024
951cbd7
scalastyle
alextheimer Feb 1, 2024
b9f7190
update validation error msg
alextheimer Feb 1, 2024
94c891f
update mpp spec
alextheimer Feb 2, 2024
53104b8
impl update: skrp scalar fix
alextheimer Feb 2, 2024
50f6720
all skrp tests pass
alextheimer Feb 2, 2024
be54048
phs ckpt
alextheimer Feb 2, 2024
2ed5e8d
Merge develop into fanout5
alextheimer Feb 2, 2024
e5f0514
resolve skrp merge conflicts
alextheimer Feb 2, 2024
8a5d775
resolve phs merge conflicts
alextheimer Feb 2, 2024
7becba7
phs ckpt
alextheimer Feb 2, 2024
44e11c8
phs ckpt
alextheimer Feb 2, 2024
e3f3c79
phs ckpt
alextheimer Feb 2, 2024
434de0d
phs ckpt
alextheimer Feb 2, 2024
426c15e
phs ckpt
alextheimer Feb 2, 2024
217cd7a
phs complte
alextheimer Feb 2, 2024
f5377ce
add test
alextheimer Feb 2, 2024
1d655f5
add test comment
alextheimer Feb 2, 2024
0b0a9b9
remove duplicate test
alextheimer Feb 2, 2024
0a619d1
remove unnecessary TODO
alextheimer Feb 2, 2024
31a24b1
readd missing metadata promql params
alextheimer Feb 2, 2024
70f4fec
bug fix: split-key stitching
alextheimer Feb 2, 2024
2b24b76
fix tests
alextheimer Feb 2, 2024
2c37f8b
bug fix: update addl LP time params
alextheimer Feb 2, 2024
c6a4fac
update test
alextheimer Feb 2, 2024
4163bd1
addl test update
alextheimer Feb 2, 2024
d2eaae7
remove println
alextheimer Feb 2, 2024
7ecec59
re-add old skrp logic
alextheimer Feb 2, 2024
7204f4d
fix phs tests
alextheimer Feb 2, 2024
bd3c3a5
bug fixes -- see comments
alextheimer Feb 3, 2024
a6542db
remove empty WLPT result check
alextheimer Feb 6, 2024
f1c50ab
distinct shard keys
alextheimer Feb 6, 2024
2148dda
add test
alextheimer Feb 8, 2024
8bc25f2
remove MPPDCE check
alextheimer Feb 8, 2024
2d9932f
update test
alextheimer Feb 8, 2024
c8a0235
add routing-key test
alextheimer Feb 8, 2024
3853235
update filter name
alextheimer Feb 8, 2024
ae334db
update docs
alextheimer Feb 8, 2024
91f5235
comparison op
alextheimer Feb 8, 2024
8471a58
remove outdated check
alextheimer Feb 8, 2024
091c152
add tschema-changes test
alextheimer Feb 8, 2024
8d4ea28
update test
alextheimer Feb 8, 2024
1d0f027
update tests
alextheimer Feb 8, 2024
6d3f4fd
add test
alextheimer Feb 8, 2024
d145d38
update copyLogicalPlanWithUpdatedTimeRange
alextheimer Feb 8, 2024
d1ee280
second-aligned rewriteRawExport timestamps
alextheimer Feb 8, 2024
47e97c1
Revert "second-aligned rewriteRawExport timestamps"
alextheimer Feb 8, 2024
297d2fb
add comment
alextheimer Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import scala.collection.mutable.ArrayBuffer
import com.typesafe.scalalogging.StrictLogging

import filodb.core.TargetSchemaProvider
import filodb.core.query.{ColumnFilter, QueryContext, RangeParams}
import filodb.core.query.{ColumnFilter, QueryContext, QueryUtils, RangeParams}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.prometheus.ast.SubqueryUtils
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.prometheus.ast.WindowConstants
Expand All @@ -28,17 +29,17 @@ object LogicalPlanUtils extends StrictLogging {
}

/**
* Given a LogicalPlan check if any descendent of plan is an aggregate operation
* Given a LogicalPlan check if any descendent of plan is either an aggregate or join operation
* @param lp the LogicalPlan instance
* @return true if a descendent is an aggregate else false
* @return true if a descendent is an aggregate or join else false
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
def hasDescendantAggregateOrJoin(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregateOrJoin(_))
case _ => false
}
/**
Expand Down Expand Up @@ -420,9 +421,22 @@ object LogicalPlanUtils extends StrictLogging {
val rsTschemaOpts = LogicalPlan.findLeafLogicalPlans(plan)
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries]).flatMap{ rs =>
val shardKeyFilters = getShardKeyFilters(rs)
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
shardKeyFilters.map{ shardKey =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
val rawShardKeyFilters = getShardKeyFilters(rs)
// The filters might contain pipe-concatenated EqualsRegex values.
// Convert these into sets of single-valued Equals filters.
val resolvedShardKeyFilters = rawShardKeyFilters.flatMap{ filters =>
val equalsFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
case _ => Seq(filter)
}}
// E.g. foo{key1=~"baz|bat",key2=~"bar|bak"} would give the following combos:
// [[baz,bar], [baz,bak], [bat,bar], [bat,bak]]
QueryUtils.combinations(equalsFilters)
}
resolvedShardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package filodb.coordinator.queryplanner
alextheimer marked this conversation as resolved.
Show resolved Hide resolved

import filodb.coordinator.queryplanner.LogicalPlanUtils.getLookBackMillis
import filodb.core.metadata.Dataset
import filodb.core.query.{PromQlQueryParams, QueryUtils}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.query.LogicalPlan
import filodb.query.exec.{ExecPlan, PromQlRemoteExec}

/**
* Abstract class for planners that need getPartitions functionality.
*
* FIXME: the ShardKeyRegexPlanner and MultiPartitionPlanner share purpose/responsibility
* and should eventually be merged. Currently, the SKRP needs getPartitions to group
* resolved shard-keys by partition before it individually materializes each of these
* groups with the MPP. The MPP will again find the corresponding partition for each group
* and materialize accordingly.
*/
abstract class PartitionLocationPlanner(dataset: Dataset,
partitionLocationProvider: PartitionLocationProvider)
extends QueryPlanner with DefaultPlanner {

// scalastyle:off method.length
/**
* Gets the partition Assignment for the given plan
*/
protected def getPartitions(logicalPlan: LogicalPlan,
queryParams: PromQlQueryParams,
infiniteTimeRange: Boolean = false) : Seq[PartitionAssignment] = {

//1. Get a Seq of all Leaf node filters
val leafFilters = LogicalPlan.getColumnFilterGroup(logicalPlan)
val nonMetricColumnSet = dataset.options.nonMetricShardColumns.toSet
//2. Filter from each leaf node filters to keep only nonMetricShardKeyColumns and convert them to key value map
val routingKeyMap: Seq[Map[String, String]] = leafFilters
.filter(_.nonEmpty)
.map(_.filter(col => nonMetricColumnSet.contains(col.column)))
.map{ filters =>
filters.map { filter =>
val values = filter.filter match {
case Equals(value) => Seq(value.toString)
case EqualsRegex(value: String) if QueryUtils.containsPipeOnlyRegex(value) =>
QueryUtils.splitAtUnescapedPipes(value)
case _ => throw new IllegalArgumentException(
s"""shard keys must be filtered by equality or "|"-only regex. filter=${filter}""")
}
(filter.column, values)
}
}
.flatMap(keyToVals => QueryUtils.makeAllKeyValueCombos(keyToVals.toMap))

// 3. Determine the query time range
val queryTimeRange = if (infiniteTimeRange) {
TimeRange(0, Long.MaxValue)
} else {
// 3a. Get the start and end time is ms based on the lookback, offset and the user provided start and end time
val (maxOffsetMs, minOffsetMs) = LogicalPlanUtils.getOffsetMillis(logicalPlan)
.foldLeft((Long.MinValue, Long.MaxValue)) {
case ((accMax, accMin), currValue) => (accMax.max(currValue), accMin.min(currValue))
}

val periodicSeriesTimeWithOffset = TimeRange((queryParams.startSecs * 1000) - maxOffsetMs,
(queryParams.endSecs * 1000) - minOffsetMs)
val lookBackMs = getLookBackMillis(logicalPlan).max

//3b Get the Query time range based on user provided range, offsets in previous steps and lookback
TimeRange(periodicSeriesTimeWithOffset.startMs - lookBackMs,
periodicSeriesTimeWithOffset.endMs)
}

//4. Based on the map in 2 and time range in 5, get the partitions to query
routingKeyMap.flatMap(metricMap =>
partitionLocationProvider.getPartitions(metricMap, queryTimeRange))
}
// scalastyle:on method.length

/**
* Checks if all the PartitionAssignments belong to same partition
*/
protected def isSinglePartition(partitions: Seq[PartitionAssignment]) : Boolean = {
if (partitions.isEmpty)
true
else {
val partName = partitions.head.partitionName
partitions.forall(_.partitionName.equals(partName))
}
}

protected def canSupportMultiPartitionCalls(execPlans: Seq[ExecPlan]): Boolean =
execPlans.forall {
case _: PromQlRemoteExec => false
case _ => true
}
}
Loading
Loading