Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cardinality): Deprecating CardinalityV1 and simplifying the cardinality logical and exec plans #1703

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, 2, overrideClusterName = CLUSTER_TYPE)),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, overrideClusterName = CLUSTER_TYPE)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _, _)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,10 @@ import filodb.query.exec._
* @return
*/
private def materializeTSCardinalityPlan(queryContext: QueryContext, logicalPlan: TsCardinalities): PlanResult = {
logicalPlan.version match {
case 2 => {
val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext)
val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext)
val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan))
PlanResult(Seq(stitchedPlan))
}
// version 1 defaults to raw as done before
case 1 => rawClusterMaterialize(queryContext, logicalPlan)
case _ => throw new UnsupportedOperationException(s"version ${logicalPlan.version} not supported!")
}
val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext)
val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext)
val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan))
PlanResult(Seq(stitchedPlan))
}

// scalastyle:off cyclomatic.complexity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
val metaExec = shardMapperFunc.assignedShards.map{ shard =>
val dispatcher = dispatcherForShard(shard, forceInProcess, qContext)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterNameToPass,
lp.version)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterNameToPass)
}
PlanResult(metaExec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
}

it("tsCardinality should span to both downsample and raw for version 2") {
val logicalPlan = TsCardinalities(Seq("a","b"), 2, 2, Seq("longtime-prometheus"))
val logicalPlan = TsCardinalities(Seq("a","b"), 2, Seq("longtime-prometheus"))

val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
Expand All @@ -352,26 +352,6 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
downsampleEp.name shouldEqual "downsample"
}

it("tsCardinality should throw exception for version > 2") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 3, Seq("longtime-prometheus"))
val ex = intercept[UnsupportedOperationException] {
val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
}
ex.getMessage.contains("version 3 not supported!") shouldEqual true
}

it("tsCardinality should span to raw ONLY for version 1") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 1, Seq("longtime-prometheus"))

val cardRawExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[MockExecPlan]

cardRawExecPlan.name shouldEqual "raw"
}

it("should direct overlapping binary join offset queries with vector(0) " +
"to both raw & downsample planner and stitch") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,35 +997,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
(endSeconds * 1000)
}

it ("should generate correct ExecPlan for TsCardinalities query version 1") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)),
PartitionAssignment("local", "local-url", TimeRange(localPartitionStart * 1000, endSeconds * 1000)))

val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] =
partitions(timeRange)

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] =
partitions(timeRange)
}

val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3)
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds, Some("/api/v1/metering/cardinality/timeseries"))
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3", "verbose" -> "true",
"datasets" -> "")

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))

execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.children(0).isInstanceOf[TsCardReduceExec] shouldEqual(true)
execPlan.children(1).isInstanceOf[MetadataRemoteExec] shouldEqual(true)
execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual(expectedUrlParams)
}

it("should generate correct ExecPlan for TsCardinalities query version 2") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
Expand All @@ -1043,7 +1014,7 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida

val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local",
dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3, 2, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m")
val lp = TsCardinalities(Seq("a", "b"), 3, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m")
, "raw,recordingrules")
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds,
Some("/api/v2/metering/cardinality/timeseries"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,30 +162,6 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers {
execPlan.asInstanceOf[PartKeysDistConcatExec].children(2).asInstanceOf[MockExecPlan].name shouldEqual ("rules2")
}

it("should generate correct ExecPlan for TsCardinalities version 1") {

// Note: this test is expected to break when TsCardinalities.isRoutable = true
// Note: unrelated to the above, this test is setup to confirm that a hacky fix to
// SPP::materializeTsCardinalities is working. See there for additional details.

val localPlanner = new SingleClusterPlanner(
dataset, schemas, localMapper, earliestRetainedTimestampFn = 0, queryConfig, "raw-temp")
val planners = Map("raw-temp" -> localPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)
val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 2, 1, Seq("raw-temp"))

// Plan should just contain a single root TsCardReduceExec and its TsCardExec children.
// Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member.
val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)

// UPDATE: We added another TsCardReduceExec to merge data across different datasets
execPlan.asInstanceOf[TsCardReduceExec].children.length shouldEqual(1)
execPlan.asInstanceOf[TsCardReduceExec].children(0).children.length shouldEqual(32)
execPlan.children.forall(_.isInstanceOf[TsCardReduceExec]) shouldEqual true
execPlan.children(0).children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true
}

it("should generate correct ExecPlan for TsCardinalities version 2") {

// Note: this test is expected to break when TsCardinalities.isRoutable = true
Expand All @@ -204,7 +180,7 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers {
val planners = Map("longtime" -> longTermPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)

val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 2, 2, Seq("longtime", "rules1", "rules2"))
val lp = TsCardinalities(Seq("a", "b"), 2, Seq("longtime", "rules1", "rules2"))

// Plan should just contain a single root TsCardReduceExec and its TsCardExec children.
// Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member.
Expand Down
1 change: 0 additions & 1 deletion query/src/main/scala/filodb/query/LogicalPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ object TsCardinalities {
*/
case class TsCardinalities(shardKeyPrefix: Seq[String],
numGroupByFields: Int,
version: Int = 1,
datasets: Seq[String] = Seq(),
userDatasets: String = "",
overrideClusterName: String = "") extends LogicalPlan {
Expand Down
25 changes: 7 additions & 18 deletions query/src/main/scala/filodb/query/PromCirceSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ object PromCirceSupport {
// Where are these used? added to make compiler happy
case l @ LabelCardinalitySampl(group, cardinality) =>
Json.fromValues(Seq(group.asJson, cardinality.asJson))
case t @ TsCardinalitiesSampl(group, cardinality) =>
Json.fromValues(Seq(group.asJson, cardinality.asJson))
case a @ TsCardinalitiesSamplV2(group, cardinality, dataset, _type) =>
Json.fromValues(Seq(group.asJson, cardinality.asJson, dataset.asJson, _type.asJson))
}
Expand Down Expand Up @@ -59,22 +57,13 @@ object PromCirceSupport {
card <- c.get[Seq[Map[String, String]]]("cardinality")
} yield LabelCardinalitySampl(metric, card)
} else if (c.downField("group").focus.nonEmpty) {
// V2 Cardinality API also has a dataset field. So we are using it to distinguish
// between the TsCardinalitiesSamplV2 vs TsCardinalitiesSampl response
if (c.downField("dataset").focus.nonEmpty) {
for {
group <- c.get[Map[String, String]]("group")
card <- c.get[Map[String, Int]]("cardinality")
dataset <- c.get[String]("dataset")
_type <- c.get[String]("_type")
} yield TsCardinalitiesSamplV2(group, card, dataset, _type)
}
else {
for {
group <- c.get[Map[String, String]]("group")
card <- c.get[Map[String, Int]]("cardinality")
} yield TsCardinalitiesSampl(group, card)
}
// This is for TsCardinalities response
for {
group <- c.get[Map[String, String]]("group")
card <- c.get[Map[String, Int]]("cardinality")
dataset <- c.get[String]("dataset")
_type <- c.get[String]("_type")
} yield TsCardinalitiesSamplV2(group, card, dataset, _type)
} else {
throw new IllegalArgumentException("could not decode any expected cardinality-related field")
}
Expand Down
3 changes: 0 additions & 3 deletions query/src/main/scala/filodb/query/PromQueryResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ final case class StdValSampl(timestamp: Long, stddev: Double, mean: Double, coun
final case class LabelCardinalitySampl(metric: Map[String, String],
cardinality: Seq[Map[String, String]]) extends MetadataSampl

final case class TsCardinalitiesSampl(group: Map[String, String],
cardinality: Map[String, Int]) extends MetadataSampl

/**
* @param group map of shardKeyPrefix and its values. Key includes - _ws_, _ns_, __name__
* @param cardinality map of string and int (cardinality count). Key includes - active, shortTerm, longTerm
Expand Down
62 changes: 11 additions & 51 deletions query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final case class TsCardReduceExec(queryContext: QueryContext,

override protected def args: String = ""

private def mapFold(acc: mutable.HashMap[ZeroCopyUTF8String, CardCounts], rv: RangeVector, rs: ResultSchema):
private def mapFold(acc: mutable.HashMap[ZeroCopyUTF8String, CardCounts], rv: RangeVector):
mutable.HashMap[ZeroCopyUTF8String, CardCounts] = {
rv.rows().foreach{ r =>
val data = RowData.fromRowReader(r)
Expand All @@ -95,17 +95,11 @@ final case class TsCardReduceExec(queryContext: QueryContext,
val (groupKey, accCounts) = if (accCountsOpt.nonEmpty || acc.contains(data.group) || acc.size < resultSize) {
(data.group, accCountsOpt.getOrElse(CardCounts(0, 0)))
} else {
// handle overflow group based on result schema given
if (rs.hasSameColumnsAs(TsCardExec.RESULT_SCHEMA)) {
// aggregate by dataset as well
val groupArray = data.group.toString.split(TsCardExec.PREFIX_DELIM)
val dataset = groupArray(groupArray.size - 1)
val dataGroup = prefixToGroupWithDataset(CardinalityStore.OVERFLOW_PREFIX, dataset)
(dataGroup, acc.getOrElseUpdate(dataGroup, CardCounts(0, 0)))
}
else {
(OVERFLOW_GROUP, acc.getOrElseUpdate(OVERFLOW_GROUP, CardCounts(0, 0)))
}
// aggregate by dataset as well
val groupArray = data.group.toString.split(TsCardExec.PREFIX_DELIM)
val dataset = groupArray(groupArray.size - 1)
val dataGroup = prefixToGroupWithDataset(CardinalityStore.OVERFLOW_PREFIX, dataset)
(dataGroup, acc.getOrElseUpdate(dataGroup, CardCounts(0, 0)))
}
acc.update(groupKey, accCounts.add(data.counts))
}
Expand All @@ -121,15 +115,11 @@ final case class TsCardReduceExec(queryContext: QueryContext,
querySession: QuerySession): Observable[RangeVector] = {

// capture the result schema of responses
var rs = ResultSchema.empty
val flatMapData = childResponses.flatMap(res => {
if (rs == ResultSchema.empty) {
rs = res._1.resultSchema
}
Observable.fromIterable(res._1.result)
})
val taskOfResults = flatMapData
.foldLeftL(new mutable.HashMap[ZeroCopyUTF8String, CardCounts])((x, y) => mapFold(x, y, rs))
.foldLeftL(new mutable.HashMap[ZeroCopyUTF8String, CardCounts])((x, y) => mapFold(x, y))
.map{ aggMap =>
val it = aggMap.toSeq.sortBy(-_._2.shortTerm).map{ case (group, counts) =>
CardRowReader(group, counts)
Expand Down Expand Up @@ -479,18 +469,8 @@ final case object TsCardExec {
// results from all TsCardinality derivatives are clipped to this size
val MAX_RESULT_SIZE = CardinalityStore.MAX_RESULT_SIZE

// row name assigned to overflow counts
val OVERFLOW_GROUP = prefixToGroup(CardinalityStore.OVERFLOW_PREFIX)

val PREFIX_DELIM = ","

/**
* This is the V1 schema version of QueryResult for TSCardinalities query
*/
val RESULT_SCHEMA_V1 = ResultSchema(Seq(ColumnInfo("group", ColumnType.StringColumn),
ColumnInfo("active", ColumnType.IntColumn),
ColumnInfo("total", ColumnType.IntColumn)), 1)

/**
* V2 schema version of QueryResult for TSCardinalities query. One more additional column `longterm` is added
* to represent the cardinality count of downsample clusters
Expand All @@ -500,14 +480,6 @@ final case object TsCardExec {
ColumnInfo("shortTerm", ColumnType.IntColumn),
ColumnInfo("longTerm", ColumnType.IntColumn)), 1)

/**
* Convert a shard key prefix to a row's group name.
*/
def prefixToGroup(prefix: Seq[String]): ZeroCopyUTF8String = {
// just concat the prefix together with a single char delimiter
prefix.mkString(PREFIX_DELIM).utf8
}

/**
* @param prefix ShardKeyPrefix from the Cardinality Record
* @param datasetName FiloDB dataset name
Expand Down Expand Up @@ -580,8 +552,7 @@ final case class TsCardExec(queryContext: QueryContext,
shard: Int,
shardKeyPrefix: Seq[String],
numGroupByFields: Int,
clusterName: String,
version: Int) extends LeafExecPlan with StrictLogging {
clusterName: String) extends LeafExecPlan with StrictLogging {
require(numGroupByFields >= 1,
"numGroupByFields must be positive")
require(numGroupByFields >= shardKeyPrefix.size,
Expand All @@ -607,14 +578,7 @@ final case class TsCardExec(queryContext: QueryContext,
val cards = tsMemStore.scanTsCardinalities(
dataset, Seq(shard), shardKeyPrefix, numGroupByFields)
val it = cards.map { card =>

// v1 and v2 cardinality have different schemas and required group key. Hence we are segregating
// w.r.t to the version
val groupKey =
version match {
case 1 => prefixToGroup(card.prefix)
case _ => prefixToGroupWithDataset(card.prefix, dataset.dataset)
}
val groupKey = prefixToGroupWithDataset(card.prefix, dataset.dataset)
// NOTE: cardinality data from downsample cluster is stored as total count in CardinalityStore. But for the
// user perspective, the cardinality data in downsample is a longterm data. Hence, we are forking the
// data path based on the cluster the data is being served from
Expand All @@ -631,14 +595,10 @@ final case class TsCardExec(queryContext: QueryContext,
}.iterator
IteratorBackedRangeVector(new CustomRangeVectorKey(Map.empty), NoCloseCursor(it), None)
}
case other =>
case _ =>
Observable.empty
}
// Sending V1 SCHEMA for v1 queries
version match {
case 1 => ExecResult(rvs, Task.eval(RESULT_SCHEMA_V1))
case _ => ExecResult(rvs, Task.eval(RESULT_SCHEMA))
}
ExecResult(rvs, Task.eval(RESULT_SCHEMA))
}
// scalastyle:on method.length

Expand Down
23 changes: 0 additions & 23 deletions query/src/main/scala/filodb/query/exec/MetadataRemoteExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,11 @@ case class MetadataRemoteExec(queryEndpoint: String,
else response.data.head match {
case _: MetadataMapSampl => mapTypeQueryResponse(response, id)
case _: LabelCardinalitySampl => mapLabelCardinalityResponse(response, id)
case _: TsCardinalitiesSampl => mapTsCardinalitiesResponse(response, id)
case _: TsCardinalitiesSamplV2 => mapTsCardinalitiesResponseV2(response, id)
case _ => labelsQueryResponse(response, id)
}
}

private def mapTsCardinalitiesResponse(response: MetadataSuccessResponse, id: String): QueryResponse = {
import NoCloseCursor._
import TsCardinalities._
import TsCardExec._

val RECORD_SCHEMA = SerializedRangeVector.toSchema(RESULT_SCHEMA.columns)

val rows = response.data.asInstanceOf[Seq[TsCardinalitiesSampl]]
.map { ts =>
val prefix = SHARD_KEY_LABELS.take(ts.group.size).map(l => ts.group(l))
val counts = CardCounts(ts.cardinality("active"), ts.cardinality("total"))
CardRowReader(prefixToGroup(prefix), counts)
}
val rv = IteratorBackedRangeVector(CustomRangeVectorKey.empty, NoCloseCursor(rows.iterator), None)
// dont add this size to queryStats since it was already added by callee use dummy QueryStats()
val srv = SerializedRangeVector(rv, builder, RECORD_SCHEMA, queryWithPlanName(queryContext), dummyQueryStats)

// NOTE: We are using the RESULT_SCHEMA definitions to determine the iteration of shardKeyPrefix in v1 result.
// Hence, we are sending the older result schema which was used for V1 Cardinality API
QueryResult(id, RESULT_SCHEMA_V1, Seq(srv))
}

/**
* @param response Metadata Response from the remote query server API call.
* @param id QueryId
Expand Down
Loading
Loading