diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index 50c7a3ca1d..ede1cd808b 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -65,7 +65,7 @@ case class TenantIngestionMetering(settings: FilodbSettings, dsIterProducer().foreach { dsRef => val fut = Client.asyncAsk( coordActorProducer(), - LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, 2, overrideClusterName = CLUSTER_TYPE)), + LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, overrideClusterName = CLUSTER_TYPE)), ASK_TIMEOUT) fut.onComplete { case Success(QueryResult(_, _, rv, _, _, _, _)) => diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala index f7503e6608..21d1cdcbf0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala @@ -233,17 +233,10 @@ import filodb.query.exec._ * @return */ private def materializeTSCardinalityPlan(queryContext: QueryContext, logicalPlan: TsCardinalities): PlanResult = { - logicalPlan.version match { - case 2 => { - val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext) - val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext) - val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan)) - PlanResult(Seq(stitchedPlan)) - } - // version 1 defaults to raw as done before - case 1 => rawClusterMaterialize(queryContext, logicalPlan) - case _ => throw new UnsupportedOperationException(s"version ${logicalPlan.version} not supported!") - } + val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext) + val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext) + val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan)) + PlanResult(Seq(stitchedPlan)) } // scalastyle:off cyclomatic.complexity diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 6ea2fa1ec6..b43385d1c0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -850,8 +850,7 @@ class SingleClusterPlanner(val dataset: Dataset, } val metaExec = shardMapperFunc.assignedShards.map{ shard => val dispatcher = dispatcherForShard(shard, forceInProcess, qContext) - exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterNameToPass, - lp.version) + exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterNameToPass) } PlanResult(metaExec) } diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala index 54300a65b1..3f9449cd9a 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala @@ -337,7 +337,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat } it("tsCardinality should span to both downsample and raw for version 2") { - val logicalPlan = TsCardinalities(Seq("a","b"), 2, 2, Seq("longtime-prometheus")) + val logicalPlan = TsCardinalities(Seq("a","b"), 2, Seq("longtime-prometheus")) val cardExecPlan = longTermPlanner.materialize( logicalPlan, @@ -352,26 +352,6 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat downsampleEp.name shouldEqual "downsample" } - it("tsCardinality should throw exception for version > 2") { - val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 3, Seq("longtime-prometheus")) - val ex = intercept[UnsupportedOperationException] { - val cardExecPlan = longTermPlanner.materialize( - logicalPlan, - QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))) - } - ex.getMessage.contains("version 3 not supported!") shouldEqual true - } - - it("tsCardinality should span to raw ONLY for version 1") { - val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 1, Seq("longtime-prometheus")) - - val cardRawExecPlan = longTermPlanner.materialize( - logicalPlan, - QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[MockExecPlan] - - cardRawExecPlan.name shouldEqual "raw" - } - it("should direct overlapping binary join offset queries with vector(0) " + "to both raw & downsample planner and stitch") { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 48bdb020c8..8049a86d4b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -997,35 +997,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida (endSeconds * 1000) } - it ("should generate correct ExecPlan for TsCardinalities query version 1") { - def partitions(timeRange: TimeRange): List[PartitionAssignment] = - List(PartitionAssignment("remote", "remote-url", - TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)), - PartitionAssignment("local", "local-url", TimeRange(localPartitionStart * 1000, endSeconds * 1000))) - - val partitionLocationProvider = new PartitionLocationProvider { - override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = - partitions(timeRange) - - override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = - partitions(timeRange) - } - - val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) - val lp = TsCardinalities(Seq("a", "b"), 3) - val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds, Some("/api/v1/metering/cardinality/timeseries")) - val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3", "verbose" -> "true", - "datasets" -> "") - - val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = - PlannerParams(processMultiPartition = true))) - - execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true) - execPlan.children(0).isInstanceOf[TsCardReduceExec] shouldEqual(true) - execPlan.children(1).isInstanceOf[MetadataRemoteExec] shouldEqual(true) - execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual(expectedUrlParams) - } - it("should generate correct ExecPlan for TsCardinalities query version 2") { def partitions(timeRange: TimeRange): List[PartitionAssignment] = List(PartitionAssignment("remote", "remote-url", @@ -1043,7 +1014,7 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) - val lp = TsCardinalities(Seq("a", "b"), 3, 2, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m") + val lp = TsCardinalities(Seq("a", "b"), 3, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m") , "raw,recordingrules") val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds, Some("/api/v2/metering/cardinality/timeseries")) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala index 79098932e6..2709a28948 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala @@ -162,30 +162,6 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers { execPlan.asInstanceOf[PartKeysDistConcatExec].children(2).asInstanceOf[MockExecPlan].name shouldEqual ("rules2") } - it("should generate correct ExecPlan for TsCardinalities version 1") { - - // Note: this test is expected to break when TsCardinalities.isRoutable = true - // Note: unrelated to the above, this test is setup to confirm that a hacky fix to - // SPP::materializeTsCardinalities is working. See there for additional details. - - val localPlanner = new SingleClusterPlanner( - dataset, schemas, localMapper, earliestRetainedTimestampFn = 0, queryConfig, "raw-temp") - val planners = Map("raw-temp" -> localPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2) - val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig) - val lp = TsCardinalities(Seq("a", "b"), 2, 1, Seq("raw-temp")) - - // Plan should just contain a single root TsCardReduceExec and its TsCardExec children. - // Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member. - val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))) - execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true) - - // UPDATE: We added another TsCardReduceExec to merge data across different datasets - execPlan.asInstanceOf[TsCardReduceExec].children.length shouldEqual(1) - execPlan.asInstanceOf[TsCardReduceExec].children(0).children.length shouldEqual(32) - execPlan.children.forall(_.isInstanceOf[TsCardReduceExec]) shouldEqual true - execPlan.children(0).children.forall(_.isInstanceOf[TsCardExec]) shouldEqual true - } - it("should generate correct ExecPlan for TsCardinalities version 2") { // Note: this test is expected to break when TsCardinalities.isRoutable = true @@ -204,7 +180,7 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers { val planners = Map("longtime" -> longTermPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2) val engine = new SinglePartitionPlanner(planners, plannerSelector, dataset, queryConfig) - val lp = TsCardinalities(Seq("a", "b"), 2, 2, Seq("longtime", "rules1", "rules2")) + val lp = TsCardinalities(Seq("a", "b"), 2, Seq("longtime", "rules1", "rules2")) // Plan should just contain a single root TsCardReduceExec and its TsCardExec children. // Currently, queries are routed only to the planner who's name equals the SPP's "defaultPlanner" member. diff --git a/query/src/main/scala/filodb/query/LogicalPlan.scala b/query/src/main/scala/filodb/query/LogicalPlan.scala index 57f8194adc..046d0d3dd5 100644 --- a/query/src/main/scala/filodb/query/LogicalPlan.scala +++ b/query/src/main/scala/filodb/query/LogicalPlan.scala @@ -185,7 +185,6 @@ object TsCardinalities { */ case class TsCardinalities(shardKeyPrefix: Seq[String], numGroupByFields: Int, - version: Int = 1, datasets: Seq[String] = Seq(), userDatasets: String = "", overrideClusterName: String = "") extends LogicalPlan { diff --git a/query/src/main/scala/filodb/query/PromCirceSupport.scala b/query/src/main/scala/filodb/query/PromCirceSupport.scala index df002ad5b4..99a640c050 100644 --- a/query/src/main/scala/filodb/query/PromCirceSupport.scala +++ b/query/src/main/scala/filodb/query/PromCirceSupport.scala @@ -19,8 +19,6 @@ object PromCirceSupport { // Where are these used? added to make compiler happy case l @ LabelCardinalitySampl(group, cardinality) => Json.fromValues(Seq(group.asJson, cardinality.asJson)) - case t @ TsCardinalitiesSampl(group, cardinality) => - Json.fromValues(Seq(group.asJson, cardinality.asJson)) case a @ TsCardinalitiesSamplV2(group, cardinality, dataset, _type) => Json.fromValues(Seq(group.asJson, cardinality.asJson, dataset.asJson, _type.asJson)) } @@ -59,22 +57,13 @@ object PromCirceSupport { card <- c.get[Seq[Map[String, String]]]("cardinality") } yield LabelCardinalitySampl(metric, card) } else if (c.downField("group").focus.nonEmpty) { - // V2 Cardinality API also has a dataset field. So we are using it to distinguish - // between the TsCardinalitiesSamplV2 vs TsCardinalitiesSampl response - if (c.downField("dataset").focus.nonEmpty) { - for { - group <- c.get[Map[String, String]]("group") - card <- c.get[Map[String, Int]]("cardinality") - dataset <- c.get[String]("dataset") - _type <- c.get[String]("_type") - } yield TsCardinalitiesSamplV2(group, card, dataset, _type) - } - else { - for { - group <- c.get[Map[String, String]]("group") - card <- c.get[Map[String, Int]]("cardinality") - } yield TsCardinalitiesSampl(group, card) - } + // This is for TsCardinalities response + for { + group <- c.get[Map[String, String]]("group") + card <- c.get[Map[String, Int]]("cardinality") + dataset <- c.get[String]("dataset") + _type <- c.get[String]("_type") + } yield TsCardinalitiesSamplV2(group, card, dataset, _type) } else { throw new IllegalArgumentException("could not decode any expected cardinality-related field") } diff --git a/query/src/main/scala/filodb/query/PromQueryResponse.scala b/query/src/main/scala/filodb/query/PromQueryResponse.scala index 1b2454f0e9..e75bc8ee9c 100644 --- a/query/src/main/scala/filodb/query/PromQueryResponse.scala +++ b/query/src/main/scala/filodb/query/PromQueryResponse.scala @@ -67,9 +67,6 @@ final case class StdValSampl(timestamp: Long, stddev: Double, mean: Double, coun final case class LabelCardinalitySampl(metric: Map[String, String], cardinality: Seq[Map[String, String]]) extends MetadataSampl -final case class TsCardinalitiesSampl(group: Map[String, String], - cardinality: Map[String, Int]) extends MetadataSampl - /** * @param group map of shardKeyPrefix and its values. Key includes - _ws_, _ns_, __name__ * @param cardinality map of string and int (cardinality count). Key includes - active, shortTerm, longTerm diff --git a/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala b/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala index c6ea4a8d14..dd3eb70e39 100644 --- a/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala +++ b/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala @@ -85,7 +85,7 @@ final case class TsCardReduceExec(queryContext: QueryContext, override protected def args: String = "" - private def mapFold(acc: mutable.HashMap[ZeroCopyUTF8String, CardCounts], rv: RangeVector, rs: ResultSchema): + private def mapFold(acc: mutable.HashMap[ZeroCopyUTF8String, CardCounts], rv: RangeVector): mutable.HashMap[ZeroCopyUTF8String, CardCounts] = { rv.rows().foreach{ r => val data = RowData.fromRowReader(r) @@ -95,17 +95,11 @@ final case class TsCardReduceExec(queryContext: QueryContext, val (groupKey, accCounts) = if (accCountsOpt.nonEmpty || acc.contains(data.group) || acc.size < resultSize) { (data.group, accCountsOpt.getOrElse(CardCounts(0, 0))) } else { - // handle overflow group based on result schema given - if (rs.hasSameColumnsAs(TsCardExec.RESULT_SCHEMA)) { - // aggregate by dataset as well - val groupArray = data.group.toString.split(TsCardExec.PREFIX_DELIM) - val dataset = groupArray(groupArray.size - 1) - val dataGroup = prefixToGroupWithDataset(CardinalityStore.OVERFLOW_PREFIX, dataset) - (dataGroup, acc.getOrElseUpdate(dataGroup, CardCounts(0, 0))) - } - else { - (OVERFLOW_GROUP, acc.getOrElseUpdate(OVERFLOW_GROUP, CardCounts(0, 0))) - } + // aggregate by dataset as well + val groupArray = data.group.toString.split(TsCardExec.PREFIX_DELIM) + val dataset = groupArray(groupArray.size - 1) + val dataGroup = prefixToGroupWithDataset(CardinalityStore.OVERFLOW_PREFIX, dataset) + (dataGroup, acc.getOrElseUpdate(dataGroup, CardCounts(0, 0))) } acc.update(groupKey, accCounts.add(data.counts)) } @@ -121,15 +115,11 @@ final case class TsCardReduceExec(queryContext: QueryContext, querySession: QuerySession): Observable[RangeVector] = { // capture the result schema of responses - var rs = ResultSchema.empty val flatMapData = childResponses.flatMap(res => { - if (rs == ResultSchema.empty) { - rs = res._1.resultSchema - } Observable.fromIterable(res._1.result) }) val taskOfResults = flatMapData - .foldLeftL(new mutable.HashMap[ZeroCopyUTF8String, CardCounts])((x, y) => mapFold(x, y, rs)) + .foldLeftL(new mutable.HashMap[ZeroCopyUTF8String, CardCounts])((x, y) => mapFold(x, y)) .map{ aggMap => val it = aggMap.toSeq.sortBy(-_._2.shortTerm).map{ case (group, counts) => CardRowReader(group, counts) @@ -479,18 +469,8 @@ final case object TsCardExec { // results from all TsCardinality derivatives are clipped to this size val MAX_RESULT_SIZE = CardinalityStore.MAX_RESULT_SIZE - // row name assigned to overflow counts - val OVERFLOW_GROUP = prefixToGroup(CardinalityStore.OVERFLOW_PREFIX) - val PREFIX_DELIM = "," - /** - * This is the V1 schema version of QueryResult for TSCardinalities query - */ - val RESULT_SCHEMA_V1 = ResultSchema(Seq(ColumnInfo("group", ColumnType.StringColumn), - ColumnInfo("active", ColumnType.IntColumn), - ColumnInfo("total", ColumnType.IntColumn)), 1) - /** * V2 schema version of QueryResult for TSCardinalities query. One more additional column `longterm` is added * to represent the cardinality count of downsample clusters @@ -500,14 +480,6 @@ final case object TsCardExec { ColumnInfo("shortTerm", ColumnType.IntColumn), ColumnInfo("longTerm", ColumnType.IntColumn)), 1) - /** - * Convert a shard key prefix to a row's group name. - */ - def prefixToGroup(prefix: Seq[String]): ZeroCopyUTF8String = { - // just concat the prefix together with a single char delimiter - prefix.mkString(PREFIX_DELIM).utf8 - } - /** * @param prefix ShardKeyPrefix from the Cardinality Record * @param datasetName FiloDB dataset name @@ -580,8 +552,7 @@ final case class TsCardExec(queryContext: QueryContext, shard: Int, shardKeyPrefix: Seq[String], numGroupByFields: Int, - clusterName: String, - version: Int) extends LeafExecPlan with StrictLogging { + clusterName: String) extends LeafExecPlan with StrictLogging { require(numGroupByFields >= 1, "numGroupByFields must be positive") require(numGroupByFields >= shardKeyPrefix.size, @@ -607,14 +578,7 @@ final case class TsCardExec(queryContext: QueryContext, val cards = tsMemStore.scanTsCardinalities( dataset, Seq(shard), shardKeyPrefix, numGroupByFields) val it = cards.map { card => - - // v1 and v2 cardinality have different schemas and required group key. Hence we are segregating - // w.r.t to the version - val groupKey = - version match { - case 1 => prefixToGroup(card.prefix) - case _ => prefixToGroupWithDataset(card.prefix, dataset.dataset) - } + val groupKey = prefixToGroupWithDataset(card.prefix, dataset.dataset) // NOTE: cardinality data from downsample cluster is stored as total count in CardinalityStore. But for the // user perspective, the cardinality data in downsample is a longterm data. Hence, we are forking the // data path based on the cluster the data is being served from @@ -631,14 +595,10 @@ final case class TsCardExec(queryContext: QueryContext, }.iterator IteratorBackedRangeVector(new CustomRangeVectorKey(Map.empty), NoCloseCursor(it), None) } - case other => + case _ => Observable.empty } - // Sending V1 SCHEMA for v1 queries - version match { - case 1 => ExecResult(rvs, Task.eval(RESULT_SCHEMA_V1)) - case _ => ExecResult(rvs, Task.eval(RESULT_SCHEMA)) - } + ExecResult(rvs, Task.eval(RESULT_SCHEMA)) } // scalastyle:on method.length diff --git a/query/src/main/scala/filodb/query/exec/MetadataRemoteExec.scala b/query/src/main/scala/filodb/query/exec/MetadataRemoteExec.scala index 348ff7b8ac..17a4e0b342 100644 --- a/query/src/main/scala/filodb/query/exec/MetadataRemoteExec.scala +++ b/query/src/main/scala/filodb/query/exec/MetadataRemoteExec.scala @@ -67,34 +67,11 @@ case class MetadataRemoteExec(queryEndpoint: String, else response.data.head match { case _: MetadataMapSampl => mapTypeQueryResponse(response, id) case _: LabelCardinalitySampl => mapLabelCardinalityResponse(response, id) - case _: TsCardinalitiesSampl => mapTsCardinalitiesResponse(response, id) case _: TsCardinalitiesSamplV2 => mapTsCardinalitiesResponseV2(response, id) case _ => labelsQueryResponse(response, id) } } - private def mapTsCardinalitiesResponse(response: MetadataSuccessResponse, id: String): QueryResponse = { - import NoCloseCursor._ - import TsCardinalities._ - import TsCardExec._ - - val RECORD_SCHEMA = SerializedRangeVector.toSchema(RESULT_SCHEMA.columns) - - val rows = response.data.asInstanceOf[Seq[TsCardinalitiesSampl]] - .map { ts => - val prefix = SHARD_KEY_LABELS.take(ts.group.size).map(l => ts.group(l)) - val counts = CardCounts(ts.cardinality("active"), ts.cardinality("total")) - CardRowReader(prefixToGroup(prefix), counts) - } - val rv = IteratorBackedRangeVector(CustomRangeVectorKey.empty, NoCloseCursor(rows.iterator), None) - // dont add this size to queryStats since it was already added by callee use dummy QueryStats() - val srv = SerializedRangeVector(rv, builder, RECORD_SCHEMA, queryWithPlanName(queryContext), dummyQueryStats) - - // NOTE: We are using the RESULT_SCHEMA definitions to determine the iteration of shardKeyPrefix in v1 result. - // Hence, we are sending the older result schema which was used for V1 Cardinality API - QueryResult(id, RESULT_SCHEMA_V1, Seq(srv)) - } - /** * @param response Metadata Response from the remote query server API call. * @param id QueryId diff --git a/query/src/test/scala/filodb/query/LogicalPlanSpec.scala b/query/src/test/scala/filodb/query/LogicalPlanSpec.scala index 67decf5150..03110e1f6f 100644 --- a/query/src/test/scala/filodb/query/LogicalPlanSpec.scala +++ b/query/src/test/scala/filodb/query/LogicalPlanSpec.scala @@ -328,7 +328,7 @@ class LogicalPlanSpec extends AnyFunSpec with Matchers { val datasets = Seq("longtime-prometheus", "recordingrules-prometheus_rules_longterm") val userDatasets = "\"raw\",\"recordingrules\"" - val plan = TsCardinalities(Seq("a","b","c"), 3, 2, datasets, userDatasets) + val plan = TsCardinalities(Seq("a","b","c"), 3, datasets, userDatasets) val queryParamsMap = plan.queryParams() queryParamsMap.get("numGroupByFields").get shouldEqual "3" diff --git a/query/src/test/scala/filodb/query/PromCirceSupportSpec.scala b/query/src/test/scala/filodb/query/PromCirceSupportSpec.scala index acbb5c3b48..e43bb9d9a2 100644 --- a/query/src/test/scala/filodb/query/PromCirceSupportSpec.scala +++ b/query/src/test/scala/filodb/query/PromCirceSupportSpec.scala @@ -150,61 +150,6 @@ class PromCirceSupportSpec extends AnyFunSpec with Matchers with ScalaFutures { } } - it("should parse TsCardinalitiesSampl") { - val expected = Seq( - TsCardinalitiesSampl( - Map("_ws_" -> "demo", "_ns_" -> "App-0", "_metric_" -> "heap_usage"), - Map("active" -> 2, "total" -> 3)), - TsCardinalitiesSampl( - Map("_ws_" -> "demo", "_ns_" -> "App-1"), - Map("active" -> 6, "total" -> 8)), - TsCardinalitiesSampl( - Map("_ws_" -> "demo"), - Map("active" -> 7, "total" -> 10)) - ) - val inputString = - """{ - | "status": "success", - | "data": [ - | { - | "group": { - | "_ws_": "demo", - | "_ns_": "App-0", - | "_metric_": "heap_usage" - | }, - | "cardinality": { - | "active": 2, - | "total": 3 - | } - | }, - | { - | "group": { - | "_ws_": "demo", - | "_ns_": "App-1" - | }, - | "cardinality": { - | "active": 6, - | "total": 8 - | } - | }, - | { - | "group": { - | "_ws_": "demo" - | }, - | "cardinality": { - | "active": 7, - | "total": 10 - | } - | } - | ] - |}""".stripMargin - - parser.decode[MetadataSuccessResponse](inputString) match { - case Right(response) => response shouldEqual MetadataSuccessResponse(expected) - case Left(ex) => throw ex - } - } - it("should parse TsCardinalitiesSamplV2") { val expected = Seq( TsCardinalitiesSamplV2( diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index 55db936901..d8aced8cc5 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -368,51 +368,51 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B // and converted to ZeroCopyUTF8Strings. Seq( TestSpec(Seq(), 1, Seq( - Seq("demo", "timeseries") -> CardCounts(4,4,4), - Seq("testws", "timeseries") -> CardCounts(1,1,1), - Seq("demo-A", "timeseries") -> CardCounts(1,1,1) + Seq("demo") -> CardCounts(4,4,4), + Seq("testws") -> CardCounts(1,1,1), + Seq("demo-A") -> CardCounts(1,1,1) )), TestSpec(Seq(), 2, Seq( - Seq("demo", "App-0", "timeseries") -> CardCounts(4,4,4), - Seq("testws", "testns", "timeseries") -> CardCounts(1,1,1), - Seq("demo-A", "App-A", "timeseries") -> CardCounts(1,1,1) + Seq("demo", "App-0") -> CardCounts(4,4,4), + Seq("testws", "testns") -> CardCounts(1,1,1), + Seq("demo-A", "App-A") -> CardCounts(1,1,1) )), TestSpec(Seq(), 3, Seq( - Seq("demo", "App-0", "http_req_total", "timeseries") -> CardCounts(2,2,2), - Seq("demo", "App-0", "http_bar_total", "timeseries") -> CardCounts(1,1,1), - Seq("demo", "App-0", "http_foo_total", "timeseries") -> CardCounts(1,1,1), - Seq("demo-A", "App-A", "http_req_total-A", "timeseries") -> CardCounts(1,1,1), - Seq("testws", "testns", "long_labels_metric", "timeseries") -> CardCounts(1,1,1) + Seq("demo", "App-0", "http_req_total") -> CardCounts(2,2,2), + Seq("demo", "App-0", "http_bar_total") -> CardCounts(1,1,1), + Seq("demo", "App-0", "http_foo_total") -> CardCounts(1,1,1), + Seq("demo-A", "App-A", "http_req_total-A") -> CardCounts(1,1,1), + Seq("testws", "testns", "long_labels_metric") -> CardCounts(1,1,1) )), TestSpec(Seq("demo"), 1, Seq( - Seq("demo", "timeseries") -> CardCounts(4,4,4))), + Seq("demo") -> CardCounts(4,4,4))), TestSpec(Seq("demo"), 2, Seq( - Seq("demo", "App-0", "timeseries") -> CardCounts(4,4,4))), + Seq("demo", "App-0") -> CardCounts(4,4,4))), TestSpec(Seq("demo"), 3, Seq( - Seq("demo", "App-0", "http_req_total", "timeseries") -> CardCounts(2,2,2), - Seq("demo", "App-0", "http_bar_total", "timeseries") -> CardCounts(1,1,1), - Seq("demo", "App-0", "http_foo_total", "timeseries") -> CardCounts(1,1,1) + Seq("demo", "App-0", "http_req_total") -> CardCounts(2,2,2), + Seq("demo", "App-0", "http_bar_total") -> CardCounts(1,1,1), + Seq("demo", "App-0", "http_foo_total") -> CardCounts(1,1,1) )), TestSpec(Seq("demo", "App-0"), 2, Seq( - Seq("demo", "App-0", "timeseries") -> CardCounts(4,4,4) + Seq("demo", "App-0") -> CardCounts(4,4,4) )), TestSpec(Seq("demo", "App-0"), 3, Seq( - Seq("demo", "App-0", "http_req_total", "timeseries") -> CardCounts(2,2,2), - Seq("demo", "App-0", "http_bar_total", "timeseries") -> CardCounts(1,1,1), - Seq("demo", "App-0", "http_foo_total", "timeseries") -> CardCounts(1,1,1) + Seq("demo", "App-0", "http_req_total") -> CardCounts(2,2,2), + Seq("demo", "App-0", "http_bar_total") -> CardCounts(1,1,1), + Seq("demo", "App-0", "http_foo_total") -> CardCounts(1,1,1) )), TestSpec(Seq("demo", "App-0", "http_req_total"), 3, Seq( - Seq("demo", "App-0", "http_req_total", "timeseries") -> CardCounts(2,2,2))) + Seq("demo", "App-0", "http_req_total") -> CardCounts(2,2,2))) ).foreach{ testSpec => val leavesRaw = (0 until shardPartKeyLabelValues.size).map{ ishard => new TsCardExec(QueryContext(), executeDispatcher,timeseriesDatasetMultipleShardKeys.ref, - ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw", 2) + ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw") }.toSeq // UPDATE: Simulating the call to downsample cluster to get longterm metrics as well val leavesDownsample = (0 until shardPartKeyLabelValues.size).map { ishard => new TsCardExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, - ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample", 2) + ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample") }.toSeq val allLeaves = leavesRaw ++ leavesDownsample @@ -430,7 +430,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B }.toSeq resultMap shouldEqual testSpec.exp.map { case (prefix, counts) => - prefixToGroup(prefix) -> counts + prefixToGroupWithDataset(prefix, timeseriesDatasetMultipleShardKeys.ref.dataset) -> counts } } } @@ -443,21 +443,21 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B case class TestSpec(shardKeyPrefix: Seq[String], numGroupByFields: Int, exp: Seq[(Seq[String], CardCounts)]) val testSpec = TestSpec(Seq(), 3, Seq( - Seq("demo", "App-0", "http_req_total", "timeseries") -> CardCounts(2, 2, 2), - Seq("demo", "App-0", "http_bar_total", "timeseries") -> CardCounts(1, 1, 1), - Seq("demo", "App-0", "http_foo_total", "timeseries") -> CardCounts(1, 1, 1), - Seq("demo-A", "App-A", "http_req_total-A", "timeseries") -> CardCounts(1, 1, 1), - Seq("testws", "testns", "long_labels_metric", "timeseries") -> CardCounts(1, 1, 1) + Seq("demo", "App-0", "http_req_total") -> CardCounts(2, 2, 2), + Seq("demo", "App-0", "http_bar_total") -> CardCounts(1, 1, 1), + Seq("demo", "App-0", "http_foo_total") -> CardCounts(1, 1, 1), + Seq("demo-A", "App-A", "http_req_total-A") -> CardCounts(1, 1, 1), + Seq("testws", "testns", "long_labels_metric") -> CardCounts(1, 1, 1) )) val leavesRaw = (0 until shardPartKeyLabelValues.size).map { ishard => new TsCardExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, - ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw", 2) + ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "raw") } val leavesDownsample = (0 until shardPartKeyLabelValues.size).map { ishard => new TsCardExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, - ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample", 2) + ishard, testSpec.shardKeyPrefix, testSpec.numGroupByFields, "downsample") } val allLeaves = leavesRaw ++ leavesDownsample @@ -481,7 +481,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B // now check for the count val testMap = testSpec.exp.map { case (prefix, counts) => - prefixToGroup(prefix) -> counts + prefixToGroupWithDataset(prefix, timeseriesDatasetMultipleShardKeys.ref.dataset) -> counts }.toMap testMap.contains(nonOverflowRow.group) shouldEqual true diff --git a/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala index a13bbb5291..5e24eb71cc 100644 --- a/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala @@ -247,46 +247,6 @@ class RemoteMetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures result.toArray shouldEqual jobQueryResult3 } - it ("timeseries cardinality version 1 remote exec") { - import TsCardExec._ - import TsCardinalities._ - - val samples = Seq( - TsCardinalitiesSampl(Map("_ws_" -> "foo", "_ns_" -> "bar", "__name__" -> "baz"), Map("active" -> 123, "total" -> 234)), - TsCardinalitiesSampl(Map("_ws_" -> "foo", "_ns_" -> "bar", "__name__" -> "bat"), Map("active" -> 345, "total" -> 456)), - TsCardinalitiesSampl(Map("_ws_" -> "foo", "_ns_" -> "bar", "__name__" -> "bak"), Map("active" -> 567, "total" -> 678)), - ) - - val testingBackendTsCard: SttpBackend[Future, Nothing] = SttpBackendStub.asynchronousFuture - .whenRequestMatches(_.uri.path.startsWith(List("api","v1","metering","cardinality","timeseries")) - ) - .thenRespondWrapped(Future { - Response(Right(Right(MetadataSuccessResponse(samples, "success", Option.empty, Option.empty))), StatusCodes.PartialContent, "", Nil, Nil) - }) - - val exec: MetadataRemoteExec = MetadataRemoteExec("http://localhost:31007/api/v1/metering/cardinality/timeseries", 10000L, - Map("match[]" -> """{_ws_="foo", _ns_="bar"}""", "numGroupByFields" -> "3"), - QueryContext(origQueryParams=PromQlQueryParams("test", 123L, 234L, 15L, Option("http://localhost:31007/api/v1/metering/cardinality/timeseries"))), - InProcessPlanDispatcher(queryConfig), timeseriesDataset.ref, RemoteHttpClient(configBuilder.build(), testingBackendTsCard), queryConfig) - - val resp = exec.execute(memStore, querySession).runToFuture.futureValue - val result = (resp: @unchecked) match { - case QueryResult(id, _, response, _, _, _, _) => - // should only contain a single RV where each row describes a single group's cardinalities - response.size shouldEqual 1 - val rows = response.head.rows().map{ rr => - RowData.fromRowReader(rr) - }.toSet - val expRows = samples.map{ s => - // order the shard keys according to precedence - val prefix = SHARD_KEY_LABELS.map(s.group(_)) - val counts = CardCounts(s.cardinality("active"), s.cardinality("total")) - RowData(prefixToGroup(prefix), counts) - }.toSet - rows shouldEqual expRows - } - } - it("timeseries cardinality version 2 remote exec") { import TsCardExec._ import TsCardinalities._