Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query) Support split partition raw queries #1677

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0f9f696
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 14, 2023
67b3a0e
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 16, 2023
435b9db
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 18, 2023
2d183d6
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 22, 2023
288ea40
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 22, 2023
f198be8
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Aug 26, 2023
207078a
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Sep 19, 2023
93c8db6
feat(query) Support no downtime query capabilities in case of time sp…
amolnayak311 Sep 21, 2023
e9a996d
WIP
amolnayak311 Nov 17, 2023
3c119aa
WIP
amolnayak311 Dec 1, 2023
5c6226c
WIP
amolnayak311 Dec 13, 2023
8e5888a
WIP
amolnayak311 Dec 13, 2023
04f8c1e
WIP
amolnayak311 Dec 13, 2023
aff7ac4
WIP
amolnayak311 Dec 13, 2023
f5595f6
WIP
amolnayak311 Dec 13, 2023
f6ac82f
WIP
amolnayak311 Dec 13, 2023
3f2f281
WIP
amolnayak311 Dec 14, 2023
ffd7c5c
WIP
amolnayak311 Dec 15, 2023
65fa999
PR Review changes
amolnayak311 Jan 3, 2024
27a8e7b
PR Review changes
amolnayak311 Jan 3, 2024
8c33a2d
Fix conflicts
amolnayak311 Jan 3, 2024
63ea481
Fix style
amolnayak311 Jan 3, 2024
89c0008
Approximately equal check only enabled in remote raw data export path
amolnayak311 Jan 4, 2024
c526c72
PR Comments
amolnayak311 Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import com.typesafe.scalalogging.StrictLogging

import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query._
import filodb.core.query.Filter.Equals
import filodb.core.store.{AllChunkScan, ChunkScanMethod, InMemoryChunkScan, TimeRangeChunkScan, WriteBufferChunkScan}
import filodb.prometheus.ast.Vectors.PromMetricLabel
import filodb.query._
import filodb.query.InstantFunctionId.HistogramBucket
import filodb.query.LogicalPlan._
import filodb.query.exec._
import filodb.query.exec.InternalRangeFunction.Last


/**
* Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the
Expand All @@ -32,7 +38,17 @@ trait DefaultPlanner {
vectors
}

def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan
def toChunkScanMethod(rangeSelector: RangeSelector): ChunkScanMethod = {
rangeSelector match {
case IntervalSelector(from, to) => TimeRangeChunkScan(from, to)
case AllChunksSelector => AllChunkScan
case WriteBufferSelector => WriteBufferChunkScan
case InMemoryChunksSelector => InMemoryChunkScan
case x@_ => throw new IllegalArgumentException(s"Unsupported range selector '$x' found")
}
}

def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan


def materializeFunctionArgs(functionParams: Seq[FunctionArgsPlan],
Expand Down Expand Up @@ -85,14 +101,116 @@ trait DefaultPlanner {
case lp: ScalarBinaryOperation => this.materializeScalarBinaryOperation(qContext, lp, forceInProcess)
case lp: SubqueryWithWindowing => this.materializeSubqueryWithWindowing(qContext, lp, forceInProcess)
case lp: TopLevelSubquery => this.materializeTopLevelSubquery(qContext, lp, forceInProcess)
case lp: PeriodicSeries => this.materializePeriodicSeries(qContext, lp, forceInProcess)
case lp: PeriodicSeriesWithWindowing =>
this.materializePeriodicSeriesWithWindowing(qContext, lp, forceInProcess)
case _: RawSeries |
_: RawChunkMeta |
_: PeriodicSeries |
_: PeriodicSeriesWithWindowing |
_: MetadataQueryPlan |
_: TsCardinalities => throw new IllegalArgumentException("Unsupported operation")
}


private[queryplanner] def materializePeriodicSeriesWithWindowing(qContext: QueryContext,
lp: PeriodicSeriesWithWindowing,
forceInProcess: Boolean): PlanResult = {
val logicalPlanWithoutBucket = if (queryConfig.translatePromToFilodbHistogram) {
removeBucket(Right(lp))._3.right.get
} else lp

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw && (logicalPlanWithoutBucket.series match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
}) // the series is raw and supports raw export, its going to yield an iterator

/* Last function is used to get the latest value in the window for absent_over_time
If no data is present AbsentFunctionMapper will return range vector with value 1 */

val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last
else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function)

val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(logicalPlanWithoutBucket.startMs,
logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource = rawSource)))
if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
// Add sum to aggregate all child responses
// If all children have NaN value, sum will yield NaN and AbsentFunctionMapper will yield 1
val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams =
qContext.plannerParams.copy(skipAggregatePresent = true)), series)))
addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters,
RangeParams(logicalPlanWithoutBucket.startMs / 1000, logicalPlanWithoutBucket.stepMs / 1000,
logicalPlanWithoutBucket.endMs / 1000), qContext)
} else series
}

private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]):
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
(Option[String], Option[String], Either[PeriodicSeries, PeriodicSeriesWithWindowing])= {
val rawSeries = lp match {
case Right(value) => value.series
case Left(value) => value.rawSeries
}

rawSeries match {
case rawSeriesLp: RawSeries =>

val nameFilter = rawSeriesLp.filters.find(_.column.equals(PromMetricLabel)).
map(_.filter.valuesStrings.head.toString)
val leFilter = rawSeriesLp.filters.find(_.column == "le").map(_.filter.valuesStrings.head.toString)

if (nameFilter.isEmpty) (nameFilter, leFilter, lp)
else {
val filtersWithoutBucket = rawSeriesLp.filters.filterNot(_.column.equals(PromMetricLabel)).
filterNot(_.column == "le") :+ ColumnFilter(PromMetricLabel,
Equals(nameFilter.get.replace("_bucket", "")))
val newLp =
if (lp.isLeft)
Left(lp.left.get.copy(rawSeries = rawSeriesLp.copy(filters = filtersWithoutBucket)))
else
Right(lp.right.get.copy(series = rawSeriesLp.copy(filters = filtersWithoutBucket)))
(nameFilter, leFilter, newLp)
}
case _ => (None, None, lp)
}
}

private[queryplanner] def materializePeriodicSeries(qContext: QueryContext,
lp: PeriodicSeries,
forceInProcess: Boolean): PlanResult = {

// Convert to FiloDB histogram by removing le label and bucket prefix
// _sum and _count are removed in MultiSchemaPartitionsExec since we need to check whether there is a metric name
// with _sum/_count as suffix
val (nameFilter: Option[String], leFilter: Option[String], lpWithoutBucket: PeriodicSeries) =
if (queryConfig.translatePromToFilodbHistogram) {
val result = removeBucket(Left(lp))
(result._1, result._2, result._3.left.get)

} else (None, None, lp)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match {
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
})
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil,
lp.offsetMs, rawSource = rawSource)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(lp.startMs / 1000, lp.stepMs / 1000,
lp.endMs / 1000))
rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
rawSeries
}

def materializeApplyInstantFunction(qContext: QueryContext,
lp: ApplyInstantFunction,
forceInProcess: Boolean = false): PlanResult = {
Expand Down Expand Up @@ -525,4 +643,89 @@ object PlannerUtil extends StrictLogging {
childTargets.iterator.drop(rnd.nextInt(childTargets.size)).next
}
}

//scalastyle:off method.length
def rewritePlanWithRemoteRawExport(lp: LogicalPlan,
rangeSelector: IntervalSelector,
additionalLookback: Long = 0): LogicalPlan =
lp match {
case lp: ApplyInstantFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan],
functionArgs = lp.functionArgs.map(
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookback).asInstanceOf[FunctionArgsPlan]))
case lp: ApplyInstantFunctionRaw =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[RawSeries],
functionArgs = lp.functionArgs.map(
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookback).asInstanceOf[FunctionArgsPlan]))
case lp: Aggregate =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: BinaryJoin =>
lp.copy(lhs = rewritePlanWithRemoteRawExport(lp.lhs, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan],
rhs = rewritePlanWithRemoteRawExport(lp.rhs, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ScalarVectorBinaryOperation =>
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
lp.copy(scalarArg = rewritePlanWithRemoteRawExport(lp.scalarArg, rangeSelector, additionalLookback)
.asInstanceOf[ScalarPlan],
vector = rewritePlanWithRemoteRawExport(lp.vector, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ApplyMiscellaneousFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ApplySortFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ScalarVaryingDoublePlan =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan],
functionArgs = lp.functionArgs.map(
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookback).asInstanceOf[FunctionArgsPlan]))
case lp: ScalarTimeBasedPlan => lp
case lp: VectorPlan =>
lp.copy(scalars = rewritePlanWithRemoteRawExport(lp.scalars, rangeSelector, additionalLookback)
.asInstanceOf[ScalarPlan])
case lp: ScalarFixedDoublePlan => lp
case lp: ApplyAbsentFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ApplyLimitFunction =>
lp.copy(vectors = rewritePlanWithRemoteRawExport(lp.vectors, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: ScalarBinaryOperation => lp
case lp: SubqueryWithWindowing =>
lp.copy(innerPeriodicSeries =
rewritePlanWithRemoteRawExport(lp.innerPeriodicSeries, rangeSelector, additionalLookback)
.asInstanceOf[PeriodicSeriesPlan],
functionArgs = lp.functionArgs.map(
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookback).asInstanceOf[FunctionArgsPlan]))
case lp: TopLevelSubquery =>
lp.copy(innerPeriodicSeries =
rewritePlanWithRemoteRawExport(lp.innerPeriodicSeries, rangeSelector, additionalLookback = additionalLookback)
.asInstanceOf[PeriodicSeriesPlan])
case lp: RawSeries =>
// IMPORTANT: When we export raw data over remote data call, offset does not mean anything, instead
// do a raw lookback of original lookback + offset and set offset to 0
alextheimer marked this conversation as resolved.
Show resolved Hide resolved
val newLookback = lp.lookbackMs.getOrElse(0L) + lp.offsetMs.getOrElse(0L) + additionalLookback
lp.copy(supportsRemoteDataCall = true, rangeSelector = rangeSelector,
lookbackMs = if (newLookback == 0) None else Some(newLookback), offsetMs = None)
case lp: RawChunkMeta => lp
case lp: PeriodicSeries =>
lp.copy(rawSeries = rewritePlanWithRemoteRawExport(lp.rawSeries, rangeSelector, additionalLookback)
.asInstanceOf[RawSeriesLikePlan], startMs = rangeSelector.from, endMs = rangeSelector.to)
case lp: PeriodicSeriesWithWindowing =>
val rs = rangeSelector.asInstanceOf[IntervalSelector]
lp.copy(
startMs = rs.from,
endMs = rs.to,
functionArgs = lp.functionArgs.map(
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookback).asInstanceOf[FunctionArgsPlan]),
series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookback)
.asInstanceOf[RawSeriesLikePlan])
case lp: MetadataQueryPlan => lp
case lp: TsCardinalities => lp
}
//scalastyle:on method.length
}
Loading
Loading