From 88c3e5d603cf82307ea9cbc66b3b43ddb9333357 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Mon, 29 Jul 2024 13:51:09 -0700 Subject: [PATCH] feat(core): Now metadata queries support _type_ filter (#1819) Current behavior : Metadata queries do not support filtering based on metric type using _type_ filter New behavior : Added support for that feature. Note that only new documents of persisted downsample index will have the type field. Index needs to be rebuilt if full support is needed. Can be skipped if metadata queries don't hit downsample index. --- .../queryplanner/SingleClusterPlanner.scala | 3 +- .../NodeCoordinatorActorSpec.scala | 4 +- .../binaryrecord2/RecordSchema.scala | 2 +- .../DownsampledTimeSeriesShard.scala | 3 +- .../memstore/PartKeyLuceneIndex.scala | 7 ++- .../memstore/TimeSeriesShard.scala | 2 +- .../scala/filodb.core/metadata/Schemas.scala | 2 + .../memstore/PartKeyLuceneIndexSpec.scala | 15 +++++- .../memstore/TimeSeriesMemStoreSpec.scala | 6 +-- .../scala/filodb/prometheus/ast/Vectors.scala | 3 +- .../prometheus/query/PrometheusModel.scala | 4 +- .../filodb/query/exec/MetadataExecSpec.scala | 53 +++++++++++++++++-- 12 files changed, 84 insertions(+), 20 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 86c3a5adab..d457a2710a 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset, private def materializeSeriesKeysByFilters(qContext: QueryContext, lp: SeriesKeysByFilters, forceInProcess: Boolean): PlanResult = { - // NOTE: _type_ filter support currently isn't there in series keys queries - val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters)) + val renamedFilters = renameMetricFilter(lp.filters) val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) { shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs) diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index 0f7e72a24a..4131753b6f 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1))) @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) //actor should restart and serve queries again probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala index 01a1252f7c..0ab22c3e0a 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo], result ++= consumer.stringPairs case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i), blobOffset(base, offset, i)) - result += ("_type_" -> + result += (Schemas.TypeLabel -> Schemas.global.schemaName( RecordSchema.schemaID(blobBase(base, offset, i), blobOffset(base, offset, i)))) diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 599c2f51fd..0ecf88fb45 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -174,7 +174,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> + Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } } diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 2cade7d972..84a251ad54 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -36,10 +36,10 @@ import spire.syntax.cfor._ import filodb.core.{concurrentCache, DatasetRef} import filodb.core.Types.PartitionKey -import filodb.core.binaryrecord2.MapItemConsumer +import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema} import filodb.core.memstore.ratelimit.CardinalityTracker +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} -import filodb.core.metadata.PartitionSchema import filodb.core.query.{ColumnFilter, Filter, QueryUtils} import filodb.core.query.Filter._ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort} @@ -677,6 +677,9 @@ class PartKeyLuceneIndex(ref: DatasetRef, // If configured and enabled, Multi-column facets will be created on "partition-schema" columns createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) + addIndexedField(Schemas.TypeLabel, schemaName) + cforRange { 0 until numPartColumns } { i => indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index ab1a73a27f..8b8e3593e0 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -1853,7 +1853,7 @@ class TimeSeriesShard(val ref: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } /** diff --git a/core/src/main/scala/filodb.core/metadata/Schemas.scala b/core/src/main/scala/filodb.core/metadata/Schemas.scala index f586cc5afe..f486f620e1 100644 --- a/core/src/main/scala/filodb.core/metadata/Schemas.scala +++ b/core/src/main/scala/filodb.core/metadata/Schemas.scala @@ -372,6 +372,8 @@ object Schemas extends StrictLogging { import Accumulation._ import Dataset._ + val TypeLabel = "_type_" + val _log = logger val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 663cb2b0cf..f39368270c 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 ) partNums7 should not equal debox.Buffer.empty[Int] + // tests to validate schema ID filter + val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("schemaID:46894".utf8))) + val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end) + partNums8 shouldEqual debox.Buffer(7, 8, 9) + + val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("prom-counter".utf8))) + val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end) + partNums9.length shouldEqual 0 + } it("should fetch part key records from filters correctly") { @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte keyIndex.refreshReadersBlocking() - keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name") + keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name") keyIndex.indexValues("not_found").toSeq should equal (Nil) val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1)) @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue) labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++ (0 until 10).map(c => s"infoLabel$c").toSet ++ - Set("_ns_", "_ws_", "_metric_") + Set("_ns_", "_ws_", "_metric_", "_type_") } it("should be able to fetch label values efficiently using facets") { diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index e5b0072870..c9ca67e4d2 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.latestOffset(dataset1.ref, 0) shouldEqual 0 val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte splits should have length (2) memStore.indexNames(dataset2.ref, 10).toSet should equal ( - Set(("n", 0), ("series", 0), ("n", 1), ("series", 1))) + Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1))) val filter = ColumnFilter("n", Filter.Equals("2".utf8)) val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter))) @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.truncate(dataset1.ref, numShards = 4) diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala index 77c4bf1714..473d135462 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala @@ -3,13 +3,14 @@ package filodb.prometheus.ast import scala.util.Try import filodb.core.{query, GlobalConfig} +import filodb.core.metadata.Schemas import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams} import filodb.prometheus.parse.Parser import filodb.query._ object Vectors { val PromMetricLabel = "__name__" - val TypeLabel = "_type_" + val TypeLabel = Schemas.TypeLabel val BucketFilterLabel = "_bucket_" val conf = GlobalConfig.systemConfig val queryConfig = conf.getConfig("filodb.query") diff --git a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala index 0e03d89eaa..75ff9b3d70 100644 --- a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala +++ b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala @@ -4,8 +4,8 @@ import remote.RemoteStorage._ import filodb.core.GlobalConfig import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer} +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType -import filodb.core.metadata.PartitionSchema import filodb.core.query.{QueryUtils, Result => _, _} import filodb.prometheus.parse.Parser.REGEX_MAX_LEN import filodb.query.{QueryResult => FiloQueryResult, _} @@ -230,7 +230,7 @@ object PrometheusModel { def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = { Map("_shards_" -> rvk.sourceShards.mkString(","), "_partIds_" -> rvk.partIds.mkString(","), - "_type_" -> rvk.schemaNames.mkString(",")) + Schemas.TypeLabel -> rvk.schemaNames.mkString(",")) } def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = { diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index d8aced8cc5..313f5a9ca3 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader import filodb.core.memstore.ratelimit.CardinalityStore import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore} import filodb.core.metadata.Schemas -import filodb.core.query._ +import filodb.core.query.{ColumnFilter, _} import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore} import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String} import filodb.query._ @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B result shouldEqual jobQueryResult1 } + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now-5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + val result = (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + val rv = response(0) + rv.rows.size shouldEqual 1 + val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader] + rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset) + } + } + result shouldEqual jobQueryResult1 + } + + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map { ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now - 5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + response.isEmpty shouldEqual true + } + } + } + it("should not return any rows for wrong column filters") { import ZeroCopyUTF8String._ val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)), @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B } it ("should be able to query labels with filter") { - val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_") + val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_") val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8))) val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B "job" -> "1", "instance" -> "2", "_metric_" -> "1", - "_ws_" -> "1") + "_ws_" -> "1", + "_type_" -> "1") } }