Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
amolnayak311 committed Dec 13, 2023
2 parents 5c6226c + 0a91fd8 commit 8e5888a
Show file tree
Hide file tree
Showing 77 changed files with 3,186 additions and 1,449 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ShardHealthStats(ref: DatasetRef,
def update(mapper: ShardMapper, skipUnassigned: Boolean = false): Unit = {
numActive.update(mapper.statuses.count(_ == ShardStatusActive))
numRecovering.update(mapper.statuses.count(_.isInstanceOf[ShardStatusRecovery]))
numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned))
if (!skipUnassigned) numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned))
numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned))
if (!skipUnassigned) numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned))
numError.update(mapper.statuses.count(_ == ShardStatusError))
numStopped.update(mapper.statuses.count(_ == ShardStatusStopped))
numDown.update(mapper.statuses.count(_ == ShardStatusDown))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef]
private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]]

// TODO move to startup-v2
private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, 2, overrideClusterName = CLUSTER_TYPE)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _, _)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ trait DefaultPlanner {
val paramsExec = materializeFunctionArgs(sqww.functionArgs, qContext)
val rangeVectorTransformer =
PeriodicSamplesMapper(
sqww.startMs, sqww.stepMs, sqww.endMs,
sqww.atMs.getOrElse(sqww.startMs), sqww.stepMs, sqww.atMs.getOrElse(sqww.endMs),
window,
Some(rangeFn),
qContext,
Expand All @@ -439,7 +439,11 @@ trait DefaultPlanner {
rawSource = false,
leftInclusiveWindow = true
)
innerExecPlan.plans.foreach { p => p.addRangeVectorTransformer(rangeVectorTransformer)}
innerExecPlan.plans.foreach { p => {
p.addRangeVectorTransformer(rangeVectorTransformer)
sqww.atMs.map(_ => p.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs
, p.queryWithPlanName(qContext))))
}}
innerExecPlan
} else {
val innerPlan = sqww.innerPeriodicSeries
Expand All @@ -455,17 +459,24 @@ trait DefaultPlanner {
sqww: SubqueryWithWindowing
) : PlanResult = {
// absent over time is essentially sum(last(series)) sent through AbsentFunctionMapper
innerExecPlan.plans.foreach(
_.addRangeVectorTransformer(PeriodicSamplesMapper(
sqww.startMs, sqww.stepMs, sqww.endMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
stepMultipleNotationUsed = false,
Seq(),
offsetMs,
rawSource = false
))
val realScanStartMs = sqww.atMs.getOrElse(sqww.startMs)
val realScanEndMs = sqww.atMs.getOrElse(sqww.endMs)
val realScanStep = sqww.atMs.map(_ => 0L).getOrElse(sqww.stepMs)

innerExecPlan.plans.foreach(plan => {
plan.addRangeVectorTransformer(PeriodicSamplesMapper(
realScanStartMs, realScanStep, realScanEndMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
stepMultipleNotationUsed = false,
Seq(),
offsetMs,
rawSource = false
))
sqww.atMs.map(_ => plan.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs,
plan.queryWithPlanName(qContext))))
}
)
val aggregate = Aggregate(AggregationOperator.Sum, innerPlan, Nil,
AggregateClause.byOpt(Seq("job")))
Expand All @@ -477,14 +488,17 @@ trait DefaultPlanner {
innerExecPlan)
)
)
addAbsentFunctionMapper(
val plans = addAbsentFunctionMapper(
aggregatePlanResult,
Seq(),
RangeParams(
sqww.startMs / 1000, sqww.stepMs / 1000, sqww.endMs / 1000
),
RangeParams(realScanStartMs / 1000, realScanStep / 1000, realScanEndMs / 1000),
qContext
)
).plans

if (sqww.atMs.nonEmpty) {
plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs,
p.queryWithPlanName(qContext))))
}
aggregatePlanResult
}

Expand Down Expand Up @@ -584,12 +598,13 @@ trait DefaultPlanner {
val execPlan =
if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, dispatcher, stitchedLhs, stitchedRhs, logicalPlan.operator,
LogicalPlanUtils.renameLabels(logicalPlan.on, dsOptions.metricColumn),
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, dsOptions.metricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, dsOptions.metricColumn), dsOptions.metricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, dispatcher, stitchedLhs, stitchedRhs, logicalPlan.operator,
logicalPlan.cardinality, LogicalPlanUtils.renameLabels(logicalPlan.on, dsOptions.metricColumn),
logicalPlan.cardinality,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, dsOptions.metricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, dsOptions.metricColumn), logicalPlan.include,
dsOptions.metricColumn, rvRangeFromPlan(logicalPlan))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object LogicalPlanParser {
private def binaryJoinToQuery(lp: BinaryJoin): String = {
val lhs = convertToQuery(lp.lhs)
val rhs = convertToQuery(lp.rhs)
val on = if (lp.on.isEmpty) "" else s"${Space}on$OpeningRoundBracket${lp.on.mkString(Comma)}$ClosingRoundBracket"
val on = lp.on.map(l => s"${Space}on$OpeningRoundBracket${l.mkString(Comma)}$ClosingRoundBracket").getOrElse("")
val ignoring = if (lp.ignoring.isEmpty) "" else s"${Space}ignoring$OpeningRoundBracket" +
s"${lp.ignoring.mkString(Comma)}$ClosingRoundBracket"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ object LogicalPlanUtils extends StrictLogging {

val clauseLabels = plan match {
case Aggregate(_, _, _, clauseOpt) => clauseOpt.get.labels
case BinaryJoin(_, _, _, _, on, _, _) => on
// TODO: Validate this is indeed the expected behavior as on clause with empty () vs no on clause are not same
case BinaryJoin(_, _, _, _, on, _, _) => on.getOrElse(Nil)
}

// FIXME: in the ShardKeyRegexPlanner/MultiPartitionPlanner, we can pushdown even when a target-schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ import filodb.query.exec._
PlanResult(Seq(execPlan))
}

private def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = {
periodicSeriesPlan match {
case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq
case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq
case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq
case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq
case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs)
case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors)
case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors)
case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors)
case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors)
case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors)
case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors)
case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan|
_: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq()
}
}


// scalastyle:off method.length
private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = {
import LogicalPlan._
Expand All @@ -55,9 +74,24 @@ import filodb.query.exec._
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)

val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max

val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset
val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan)

val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
// should be in raw cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime)
} else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
// should be in down sample cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime)
} else {
atModifierTimestampsWithOffset.isEmpty
}
require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" +
s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.")

if (maxOffset != minOffset
&& startWithOffsetMs - lookbackMs < earliestRawTime
&& endWithOffsetMs >= earliestRawTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,12 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, InProcessPlanDispatcher(queryConfig), Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
LogicalPlanUtils.renameLabels(logicalPlan.on, datasetMetricColumn),
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
logicalPlan.cardinality, LogicalPlanUtils.renameLabels(logicalPlan.on, datasetMetricColumn),
logicalPlan.cardinality, logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
LogicalPlanUtils.renameLabels(logicalPlan.include, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
Expand Down Expand Up @@ -767,7 +767,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
else {
createMetadataRemoteExec(qContext, p, lp.queryParams())
val newQueryContext = qContext.copy(origQueryParams = queryParams.copy(verbose = true))
createMetadataRemoteExec(newQueryContext, p, lp.queryParams())
}
}
if (execPlans.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,13 @@ class ShardKeyRegexPlanner(val dataset: Dataset,

val execPlan = if (logicalPlan.operator.isInstanceOf[SetOperator])
SetOperatorExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
LogicalPlanUtils.renameLabels(logicalPlan.on, datasetMetricColumn),
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
else
BinaryJoinExec(qContext, inProcessPlanDispatcher, Seq(lhsExec), Seq(rhsExec), logicalPlan.operator,
logicalPlan.cardinality, LogicalPlanUtils.renameLabels(logicalPlan.on, datasetMetricColumn),
logicalPlan.cardinality,
logicalPlan.on.map(LogicalPlanUtils.renameLabels(_, datasetMetricColumn)),
LogicalPlanUtils.renameLabels(logicalPlan.ignoring, datasetMetricColumn),
LogicalPlanUtils.renameLabels(logicalPlan.include, datasetMetricColumn), datasetMetricColumn,
rvRangeFromPlan(logicalPlan))
Expand Down
Loading

0 comments on commit 8e5888a

Please sign in to comment.