Skip to content

Commit

Permalink
feat(query): Cardinality V2 API Query Plan changes (#1637)
Browse files Browse the repository at this point in the history
feat(query): Cardinality V2 API Query Plan changes
  • Loading branch information
sandeep6189 authored Aug 2, 2023
1 parent f5018ae commit 84a185f
Show file tree
Hide file tree
Showing 18 changed files with 488 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ case class TenantIngestionMetering(settings: FilodbSettings,
"cluster_type" -> CLUSTER_TYPE)

if (CLUSTER_TYPE == "downsample") {
Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.longTerm.toDouble)
}
else {
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.shortTerm.toDouble)
}
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,28 @@ import filodb.query.exec._
PlanResult(Seq(execPlan))
}

/**
* Materialize Ts cardinality plan. For v1 version, we only go to raw cluster for back compatibility. For v2 versions,
* we would go to both downsample and raw cluster
*
* @param logicalPlan The TsCardinalities logical plan to materialize
* @param queryContext The QueryContext object
* @return
*/
private def materializeTSCardinalityPlan(queryContext: QueryContext, logicalPlan: TsCardinalities): PlanResult = {
logicalPlan.version match {
case 2 => {
val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext)
val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext)
val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan))
PlanResult(Seq(stitchedPlan))
}
// version 1 defaults to raw as done before
case 1 => rawClusterMaterialize(queryContext, logicalPlan)
case _ => throw new UnsupportedOperationException(s"version ${logicalPlan.version} not supported!")
}
}

// scalastyle:off cyclomatic.complexity
override def walkLogicalPlanTree(logicalPlan: LogicalPlan,
qContext: QueryContext,
Expand All @@ -199,13 +221,13 @@ import filodb.query.exec._
logicalPlan match {
case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p)
case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext)
case ts: TsCardinalities => materializeTSCardinalityPlan(qContext, ts)
case _: LabelValues |
_: ApplyLimitFunction |
_: SeriesKeysByFilters |
_: ApplyInstantFunctionRaw |
_: RawSeries |
_: LabelNames |
_: TsCardinalities => rawClusterMaterialize(qContext, logicalPlan)
_: LabelNames => rawClusterMaterialize(qContext, logicalPlan)
}
}
else logicalPlan match {
Expand All @@ -219,7 +241,7 @@ import filodb.query.exec._
case lp: BinaryJoin => materializePeriodicSeriesPlan(qContext, lp)
case lp: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, lp)
case lp: LabelValues => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => materializeTSCardinalityPlan(qContext, lp)
case lp: SeriesKeysByFilters => rawClusterMaterialize(qContext, lp)
case lp: ApplyMiscellaneousFunction => super.materializeApplyMiscellaneousFunction(qContext, lp)
case lp: ApplySortFunction => super.materializeApplySortFunction(qContext, lp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider

def materializeTsCardinalities(lp: TsCardinalities, qContext: QueryContext): PlanResult = {

import TsCardinalities._

val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val partitions = if (lp.shardKeyPrefix.size >= 2) {
// At least a ws/ns pair is required to select specific partitions.
Expand All @@ -621,12 +619,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
else {
val params = Map(
"match[]" -> ("{" + SHARD_KEY_LABELS.zip(lp.shardKeyPrefix)
.map{ case (label, value) => s"""$label="$value""""}
.mkString(",") + "}"),
"numGroupByFields" -> lp.numGroupByFields.toString)
createMetadataRemoteExec(qContext, p, params)
createMetadataRemoteExec(qContext, p, lp.queryParams())
}
}
if (execPlans.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ class SingleClusterPlanner(val dataset: Dataset,
forceInProcess: Boolean): PlanResult = {
val metaExec = shardMapperFunc.assignedShards.map{ shard =>
val dispatcher = dispatcherForShard(shard, forceInProcess, qContext)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterName)
}
PlanResult(metaExec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ import filodb.query.exec._
* distributed across multiple clusters.
*
* @param planners map of clusters names in the local partition to their Planner objects
* @param defaultPlanner TsCardinalities queries are routed here.
* Note: this is a temporary fix only to support TsCardinalities queries.
* These must be routed to planners according to the data they govern, and
* this information isn't accessible without this parameter.
* @param plannerSelector a function that selects the planner name given the metric name
* @param dataset a function that selects the planner name given the metric name
*/
class SinglePartitionPlanner(planners: Map[String, QueryPlanner],
defaultPlanner: String, // TODO: remove this-- see above.
plannerSelector: String => String,
val dataset: Dataset,
val queryConfig: QueryConfig)
Expand Down Expand Up @@ -74,9 +69,9 @@ class SinglePartitionPlanner(planners: Map[String, QueryPlanner],
}

private def materializeTsCardinalities(logicalPlan: TsCardinalities, qContext: QueryContext): PlanResult = {
// Delegate to defaultPlanner
planners.get(defaultPlanner).map(p => PlanResult(Seq(p.materialize(logicalPlan, qContext))))
.getOrElse(PlanResult(Seq()))
val execPlans = logicalPlan.datasets.map(d => planners.get(d))
.map(x => x.get.materialize(logicalPlan, qContext))
PlanResult(Seq(TsCardReduceExec(qContext, inProcessPlanDispatcher, execPlans)))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,41 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
binaryJoinExec.rhs.head.isInstanceOf[StitchRvsExec] shouldEqual (true)
}

it("tsCardinality should span to both downsample and raw for version 2") {
val logicalPlan = TsCardinalities(Seq("a","b"), 2, 2, Seq("longtime-prometheus"))

val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[TsCardReduceExec]

cardExecPlan.dispatcher.isInstanceOf[InProcessPlanDispatcher] shouldEqual true
cardExecPlan.children.size shouldEqual 2
val rawEp = cardExecPlan.children.head.asInstanceOf[MockExecPlan]
val downsampleEp = cardExecPlan.children.last.asInstanceOf[MockExecPlan]

rawEp.name shouldEqual "raw"
downsampleEp.name shouldEqual "downsample"
}

it("tsCardinality should throw exception for version > 2") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 3, Seq("longtime-prometheus"))
val ex = intercept[UnsupportedOperationException] {
val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
}
ex.getMessage.contains("version 3 not supported!") shouldEqual true
}

it("tsCardinality should span to raw ONLY for version 1") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 1, Seq("longtime-prometheus"))

val cardRawExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[MockExecPlan]

cardRawExecPlan.name shouldEqual "raw"
}

it("should direct overlapping binary join offset queries with vector(0) " +
"to both raw & downsample planner and stitch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
(endSeconds * 1000)
}

it ("should generate correct ExecPlan for TsCardinalities query") {
it ("should generate correct ExecPlan for TsCardinalities query version 1") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)),
Expand All @@ -1066,7 +1066,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3)
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds, Some("/api/v1/metering/cardinality/timeseries"))
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3")
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3", "verbose" -> "true",
"datasets" -> "")

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))
Expand All @@ -1077,6 +1078,38 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual(expectedUrlParams)
}

it("should generate correct ExecPlan for TsCardinalities query version 2") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)),
PartitionAssignment("local", "local-url", TimeRange(localPartitionStart * 1000, endSeconds * 1000)))

val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] =
partitions(timeRange)

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange):List[PartitionAssignment] =
partitions(timeRange)
}

val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local",
dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3, 2, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m"))
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds,
Some("/api/v2/metering/cardinality/timeseries"))
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3","verbose" -> "true",
"datasets" -> "longtime-prometheus,recordingrules-prometheus_rules_1m")

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))

execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.children(0).isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.children(1).isInstanceOf[MetadataRemoteExec] shouldEqual (true)
execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual (expectedUrlParams)
}

it ("should generate multipartition BinaryJoin") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] = List(PartitionAssignment("remote", "remote-url",
TimeRange(timeRange.startMs, timeRange.endMs)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
if (metricName.contains(":1m")) "recordingRules" else "longTerm"
}
val planners = Map("longTerm" -> longTermPlanner, "recordingRules" -> recordingRulesPlanner)
val singlePartitionPlanner = new SinglePartitionPlanner(planners, "longTerm", plannerSelector,
val singlePartitionPlanner = new SinglePartitionPlanner(planners, plannerSelector,
dataset, queryConfig)

private val oneRemotePartitionLocationProvider = new PartitionLocationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import monix.execution.Scheduler

import filodb.coordinator.{ActorPlanDispatcher, ShardMapper}
import filodb.core.{DatasetRef, MetricsTestData}
import filodb.core.metadata.Schemas
Expand Down Expand Up @@ -81,10 +80,13 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers {
}

val planners = Map("local" -> highAvailabilityPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)
val plannerSelector = (metricName: String) => { if (metricName.equals("rr1")) "rules1"
else if (metricName.equals("rr2")) "rules2" else "local" }

val engine = new SinglePartitionPlanner(planners, "local", plannerSelector, dataset, queryConfig)
val plannerSelector = (metricName: String) => {
if (metricName.equals("rr1")) "rules1"
else if (metricName.equals("rr2")) "rules2" else "local"
}

val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig)

it("should generate Exec plan for simple query") {
val lp = Parser.queryToLogicalPlan("test{job = \"app\"}", 1000, 1000)
Expand Down Expand Up @@ -160,7 +162,7 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers {
execPlan.asInstanceOf[PartKeysDistConcatExec].children(2).asInstanceOf[MockExecPlan].name shouldEqual ("rules2")
}

it("should generate correct ExecPlan for TsCardinalities") {
it("should generate correct ExecPlan for TsCardinalities version 1") {

// Note: this test is expected to break when TsCardinalities.isRoutable = true
// Note: unrelated to the above, this test is setup to confirm that a hacky fix to
Expand All @@ -169,15 +171,63 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers {
val localPlanner = new SingleClusterPlanner(
dataset, schemas, localMapper, earliestRetainedTimestampFn = 0, queryConfig, "raw-temp")
val planners = Map("raw-temp" -> localPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)
val engine = new SinglePartitionPlanner(planners, "raw-temp", plannerSelector, dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 2)
val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 2, 1, Seq("raw-temp"))

// Plan should just contain a single root TsCardReduceExec and its TsCardExec children.
// Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member.
val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)

// UPDATE: We added another TsCardReduceExec to merge data across different datasets
execPlan.asInstanceOf[TsCardReduceExec].children.length shouldEqual(1)
execPlan.asInstanceOf[TsCardReduceExec].children(0).children.length shouldEqual(32)
execPlan.children.forall(_.isInstanceOf[TsCardReduceExec]) shouldEqual true
execPlan.children(0).children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true
}

it("should generate correct ExecPlan for TsCardinalities version 2") {

// Note: this test is expected to break when TsCardinalities.isRoutable = true
// Note: unrelated to the above, this test is setup to confirm that a hacky fix to
// SPP::materializeTsCardinalities is working. See there for additional details.

val rawPlanner = new SingleClusterPlanner(
dataset, schemas, localMapper, earliestRetainedTimestampFn = 0, queryConfig, "raw")

val downsamplePlanner = new SingleClusterPlanner(
dataset, schemas, localMapper, earliestRetainedTimestampFn = 0, queryConfig, "downsample")

val longTermPlanner = new LongTimeRangePlanner(rawPlanner, downsamplePlanner, 0, 0,
InProcessPlanDispatcher(QueryConfig.unitTestingQueryConfig), queryConfig, dataset)

val planners = Map("longtime" -> longTermPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)

val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 2, 2, Seq("longtime", "rules1", "rules2"))

// Plan should just contain a single root TsCardReduceExec and its TsCardExec children.
// Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member.
val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.asInstanceOf[TsCardReduceExec].children.length shouldEqual(32)
execPlan.children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true

// UPDATE: should have 3 children, since we passed 3 datasets
execPlan.asInstanceOf[TsCardReduceExec].children.length shouldEqual (3)

// the longtime should have 2 children for downsample and raw
val longTermExecPlan = execPlan.asInstanceOf[TsCardReduceExec].children(0)
longTermExecPlan.children.length shouldEqual (2)
longTermExecPlan.children(0).dispatcher.clusterName shouldEqual "raw"
longTermExecPlan.children(1).dispatcher.clusterName shouldEqual "downsample"

longTermExecPlan.children(0).children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true
longTermExecPlan.children(1).children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true

// rules plan
val rules1ExecPlan = execPlan.asInstanceOf[TsCardReduceExec].children(1).asInstanceOf[MockExecPlan]
rules1ExecPlan.name shouldEqual "rules1"
val rules2ExecPlan = execPlan.asInstanceOf[TsCardReduceExec].children(2).asInstanceOf[MockExecPlan]
rules2ExecPlan.name shouldEqual "rules2"
}

it("should generate Exec plan for Scalar query which does not have any metric") {
Expand Down
Loading

0 comments on commit 84a185f

Please sign in to comment.