Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): arbitrary target-schema columns #1801

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import akka.serialization.SerializationExtension
import com.typesafe.scalalogging.StrictLogging

import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher}
import filodb.core.TargetSchemaProvider
import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query._
import filodb.core.query.Filter.Equals
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.core.store.{AllChunkScan, ChunkScanMethod, InMemoryChunkScan, TimeRangeChunkScan, WriteBufferChunkScan}
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.query._
Expand All @@ -18,6 +19,7 @@ import filodb.query.LogicalPlan._
import filodb.query.exec._
import filodb.query.exec.InternalRangeFunction.Last

// scalastyle:off file.size.limit

/**
* Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the
Expand Down Expand Up @@ -841,4 +843,78 @@ object PlannerUtil extends StrictLogging {
def replaceLastBucketOccurenceStringFromMetricName(metricName: String): String = {
metricName.replaceAll("_bucket$", "")
}

/**
* Returns true iff the argument filters match the target-schema columns
* by either pure or regex equality.
*/
def hasAllTargetSchemaFilters(targetSchemaCols: Seq[String],
colFilters: Seq[ColumnFilter]): Boolean = {
targetSchemaCols.forall { tschemaCol =>
colFilters.filter(_.column == tschemaCol).exists {
case ColumnFilter(_, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => true
case ColumnFilter(_, equals: Equals) => true
case _ => false
}
}
}

// scalastyle:off method.length

/**
* Returns a target-schema iff all of the following are true:
* - When the argument filters are resolved into sets of pure equality filters,
* tschemas are defined for all sets.
* - All tschemas are defined against the same columns.
* - No tschema changes during the query range.
*/
def getTargetSchemaColumns(colFilters: Seq[ColumnFilter],
targetSchemaProvider: TargetSchemaProvider,
startMs: Long, endMs: Long): Option[Seq[String]] = {
val keyToValues = colFilters.filter {
case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => true
case _ => false
}.map { colFilter =>
val eqRegex = colFilter.filter.asInstanceOf[EqualsRegex]
val values = QueryUtils.splitAtUnescapedPipes(eqRegex.value.toString).distinct
(colFilter.column, values)
}.filter(_._2.nonEmpty)
.toMap

val targetSchemaChanges = QueryUtils.makeAllKeyValueCombos(keyToValues).map { keyToValue =>
// Replace pipe-concatenated EqualsRegex filters with Equals filters.
val equalsFilters = keyToValue.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq
val newFilters = LogicalPlanUtils.upsertFilters(colFilters, equalsFilters)
targetSchemaProvider.targetSchemaFunc(newFilters)
}

val isChanging = targetSchemaChanges.exists { changes =>
changes.nonEmpty && changes.exists(c => c.time >= startMs && c.time <= endMs)
}
if (isChanging) {
return None
}

val targetSchemaOpts = targetSchemaChanges.map { changes =>
val tsIndex = changes.lastIndexWhere(t => t.time <= startMs)
if (tsIndex > -1) Some(changes(tsIndex)) else None
}
if (targetSchemaOpts.exists(_.isEmpty)) {
return None
}

val targetSchemas = targetSchemaOpts.map(_.get.schema)
val hasAllSameTargetSchemas = {
val headCols = targetSchemas.head.toSet
targetSchemas.tail.forall(_.toSet == headCols)
}
if (!hasAllSameTargetSchemas) {
return None
}

Some(targetSchemas.head)
}
// scalastyle:on method.length
}

// scalastyle:on file.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -421,43 +421,32 @@ object LogicalPlanUtils extends StrictLogging {
}

/**
* Returns a set of target-schema columns iff all of:
* - all plan RawSeries share the same target-schema columns.
* - no target-schema definition changes during the query.
* Returns an occupied set of target-schema columns iff a
* target-schema is applicable to the plan.
* @param getShardKeyFilterGroups returns sets of pure-equals or pipe-concatenated
* regex-equals shard-key filters
*/
def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan,
targetSchemaProvider: TargetSchemaProvider,
getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]])
getShardKeyFilterGroups: RawSeries => Seq[Seq[ColumnFilter]])
: Option[Seq[String]] = {
// compose a stream of Options for each RawSeries--
// the options contain a target-schema iff it is defined and unchanging.
val rawSeries = LogicalPlan.findLeafLogicalPlans(plan)
.filter(_.isInstanceOf[RawSeries])
.map(_.asInstanceOf[RawSeries])
if (rawSeries.exists(!_.rangeSelector.isInstanceOf[IntervalSelector])) {
// Cannot handle RawSeries without IntervalSelector.
return None
}
val rsTschemaOpts = rawSeries.flatMap{ rs =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val rawShardKeyFilters = getShardKeyFilters(rs)
// The filters might contain pipe-concatenated EqualsRegex values.
// Convert these into sets of single-valued Equals filters.
val resolvedShardKeyFilters = rawShardKeyFilters.flatMap { filters =>
val equalsFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}
}
// E.g. foo{key1=~"baz|bat",key2=~"bar|bak"} would give the following combos:
// [[baz,bar], [baz,bak], [bat,bar], [bat,bak]]
QueryUtils.combinations(equalsFilters)
}.map(_.toSet).distinct.map(_.toSeq) // make sure keys are distinct
resolvedShardKeyFilters.map{ shardKey =>
val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey)
LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval)
val rsTschemaOpts = rawSeries.flatMap { rs =>
val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs)
val rawFilters = LogicalPlan.getColumnFilterGroup(rs)
assert(rawFilters.size == 1, s"expected single RawSeries filter group; rawSeries=$rs; filters=$rawFilters")
val shardKeyFilterGroups = getShardKeyFilterGroups(rs)
shardKeyFilterGroups.map { shardKeyFilterGroup =>
// Upsert shard-key equality filters into the full set of filters;
// use this new set to find the target-schema columns.
val newFilters = LogicalPlanUtils.upsertFilters(rawFilters.head.toSeq, shardKeyFilterGroup)
PlannerUtil.getTargetSchemaColumns(newFilters, targetSchemaProvider, interval.from, interval.to)
}
}
if (rsTschemaOpts.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,24 @@ import kamon.Kamon

import filodb.coordinator.{ActorPlanDispatcher, GrpcPlanDispatcher, RemoteActorPlanDispatcher, ShardMapper}
import filodb.coordinator.client.QueryCommands.StaticSpreadProvider
import filodb.coordinator.queryplanner.SingleClusterPlanner.findTargetSchema
import filodb.core.{SpreadProvider, StaticTargetSchemaProvider, TargetSchemaChange, TargetSchemaProvider}
import filodb.coordinator.queryplanner.PlannerUtil.{getTargetSchemaColumns, hasAllTargetSchemaFilters}
import filodb.core.{SpreadProvider, StaticTargetSchemaProvider, TargetSchemaProvider}
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query.{Filter, _}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.core.query._
import filodb.core.query.Filter.Equals
import filodb.prometheus.ast.Vectors.{PromMetricLabel, TypeLabel}
import filodb.prometheus.ast.WindowConstants
import filodb.query.{exec, _}
import filodb.query._
import filodb.query.InstantFunctionId.HistogramBucket
import filodb.query.LogicalPlan._
import filodb.query.exec.{LocalPartitionDistConcatExec, _}
import filodb.query.exec._
import filodb.query.exec.InternalRangeFunction.Last

// scalastyle:off file.size.limit

object SingleClusterPlanner {
private val mdNoShardKeyFilterRequests = Kamon.counter("queryengine-metadata-no-shardkey-requests").withoutTags

// Find the TargetSchema that is applicable i.e effective for the current query window
private def findTargetSchema(targetSchemaChanges: Seq[TargetSchemaChange],
startMs: Long, endMs: Long): Option[TargetSchemaChange] = {
val tsIndex = targetSchemaChanges.lastIndexWhere(t => t.time <= startMs)
if(tsIndex > -1)
Some(targetSchemaChanges(tsIndex))
else
None
}
}

/**
Expand Down Expand Up @@ -74,65 +64,6 @@ class SingleClusterPlanner(val dataset: Dataset,
qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider)
}

/**
* Returns true iff a target-schema:
* (1) matches any shard-key matched by the argument filters, and
* (2) changes between the argument timestamps.
*/
def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
val keyToValues = shardKeyFilters.map { filter =>
val values = filter match {
case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) =>
QueryUtils.splitAtUnescapedPipes(regex.value.toString).distinct
case ColumnFilter(col, equals: Equals) =>
Seq(equals.value.toString)
}
(filter.column, values)
}.toMap
QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys =>
// Replace any EqualsRegex shard-key filters with Equals.
val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq
val newFilters = LogicalPlanUtils.upsertFilters(shardKeyFilters, equalsFilters)
val targetSchemaChanges = targetSchemaProvider(qContext).targetSchemaFunc(newFilters)
targetSchemaChanges.nonEmpty && targetSchemaChanges.exists(c => c.time >= startMs && c.time <= endMs)
}
}

/**
* Returns true iff a target-schema should be used to identify query shards.
* A target-schema should be used iff all of:
* (1) A target-schema is defined for the argument filters.
* (2) The target-schema does not change between startMs and endMs.
* (3) All required target-schema columns are present in the argument filters.
*
* @param filters Query Column Filters
* @param targetSchema TargetSchema
* @return useTargetSchema - use target-schema to calculate query shards
*/
def useTargetSchemaForShards(filters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
val targetSchemaChanges = targetSchemaProvider(qContext).targetSchemaFunc(filters)
val targetSchemaOpt = findTargetSchema(targetSchemaChanges, startMs, endMs)
if (targetSchemaOpt.isEmpty) {
return false
}

val shardKeyFilters = {
val filterOpts = dataset.options.nonMetricShardColumns.map(col => filters.find(_.column == col))
assert(filterOpts.forall(_.isDefined), "expected all shard-key filters present but found: " + filters)
filterOpts.map(_.get)
}
val tsChangeExists = isTargetSchemaChanging(shardKeyFilters, startMs, endMs, qContext)
val allTSColsPresent = targetSchemaOpt.get.schema
.forall(tschemaCol => filters.exists(cf =>
cf.column == tschemaCol && cf.filter.isInstanceOf[Equals]))

!tsChangeExists && allTSColsPresent
}

import SingleClusterPlanner._

private def dispatcherForShard(shard: Int, forceInProcess: Boolean, queryContext: QueryContext): PlanDispatcher = {
Expand Down Expand Up @@ -341,19 +272,15 @@ class SingleClusterPlanner(val dataset: Dataset,
val shardValues = shardPairs.filterNot(_._1 == dsOptions.metricColumn).map(_._2)

logger.debug(s"For shardColumns $shardColumns, extracted metric $metric and shard values $shardValues")
val targetSchemaChange = targetSchemaProvider(qContext).targetSchemaFunc(filters)
val targetSchema = {
if (targetSchemaChange.nonEmpty) {
findTargetSchema(targetSchemaChange, startMs, endMs).map(tsc => tsc.schema).getOrElse(Seq.empty)
} else Seq.empty
}
val shardHash = RecordBuilder.shardKeyHash(shardValues, dsOptions.metricColumn, metric, targetSchema)
if(useTargetSchemaForShards(filters, startMs, endMs, qContext)) {
val targetSchema = getTargetSchemaColumns(filters, targetSchemaProvider(qContext), startMs, endMs)
val shardHash = RecordBuilder.shardKeyHash(shardValues, dsOptions.metricColumn, metric,
targetSchema.getOrElse(Seq.empty))
if(targetSchema.isDefined && hasAllTargetSchemaFilters(targetSchema.get, filters)) {
val nonShardKeyLabelPairs = filters.filter(f => !shardColumns.contains(f.column)
&& f.filter.isInstanceOf[Filter.Equals])
.map(cf => cf.column ->
cf.filter.asInstanceOf[Filter.Equals].value.toString).toMap
val partitionHash = RecordBuilder.partitionKeyHash(nonShardKeyLabelPairs, shardPairs.toMap, targetSchema,
val partitionHash = RecordBuilder.partitionKeyHash(nonShardKeyLabelPairs, shardPairs.toMap, targetSchema.get,
dsOptions.metricColumn, metric)
// since target-schema filter is provided in the query, ingestionShard can be used to find the single shard
// that can answer the query.
Expand Down Expand Up @@ -392,16 +319,35 @@ class SingleClusterPlanner(val dataset: Dataset,
(shardCol, trimmedValues)
}

// Find the union of all shards for each shard-key.
val shardKeys = QueryUtils.makeAllKeyValueCombos(shardColToValues.toMap)
shardKeys.flatMap{ shardKey =>
// Find all *non*-shard-key filters matched by equality (pure or pipe-concatenated regex).
// Unlike above, this doesn't trim or throw exceptions.
val nonShardColToValues: Seq[(String, Seq[String])] = filters
.filterNot {f => shardColumns.contains(f.column)}
.map {
case ColumnFilter(col, Filter.Equals(filtVal: String)) =>
(col, Seq(filtVal))
case ColumnFilter(col, Filter.EqualsRegex(filtVal: String))
if QueryUtils.containsPipeOnlyRegex(filtVal) =>
val values = QueryUtils.splitAtUnescapedPipes(filtVal).distinct
(col, values)
case filter => (filter.column, Seq())
}
.filter(_._2.nonEmpty)

// Resolve (col -> seq[values]) pairs into all possible combinations of col->value sets.
// Create Equals filters for each entry and upsert into the argument filters; each
// new set of filters can then be used to identify a set of shards.
val colToValues = (shardColToValues ++ nonShardColToValues).toMap
val colToValueMaps = QueryUtils.makeAllKeyValueCombos(colToValues)
colToValueMaps.flatMap{ colValueMap =>
// Replace any EqualsRegex shard-key filters with Equals.
val newFilters = filters.map{ filt =>
shardKey.get(filt.column)
colValueMap.get(filt.column)
.map(value => ColumnFilter(filt.column, Filter.Equals(value)))
.getOrElse(filt)
}
shardsFromValues(shardKey.toSeq, newFilters, qContext, startMs, endMs)
val newShardKeyFilters = colValueMap.filter(label => shardColumns.contains(label._1))
shardsFromValues(newShardKeyFilters.toSeq, newFilters, qContext, startMs, endMs)
}.distinct
}
}
Expand Down Expand Up @@ -893,10 +839,6 @@ class SingleClusterPlanner(val dataset: Dataset,
case _ => (0, Long.MaxValue)
}

val shardKeyFilters = LogicalPlan.getNonMetricShardKeyFilters(lp, dataset.options.nonMetricShardColumns)
assert(shardKeyFilters.size == 1, "RawSeries with more than one shard-key group: " + lp)
val targetSchemaChangesExist = isTargetSchemaChanging(shardKeyFilters.head, startMs, endMs, qContext)

val execPlans = shardsFromFilters(renamedFilters, qContext, startMs, endMs).map { shard =>
val dispatcher = dispatcherForShard(shard, forceInProcess, qContext)
val ep = MultiSchemaPartitionsExec(
Expand All @@ -913,9 +855,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}

// Stitch only if spread changes during the query-window.
// When a target-schema changes during query window, data might be ingested in
// different shards after the change.
PlanResult(execPlans, needsStitch || targetSchemaChangesExist)
PlanResult(execPlans, needsStitch)
}
// scalastyle:on method.length

Expand Down
Loading
Loading