diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 86c3a5adab..d457a2710a 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset, private def materializeSeriesKeysByFilters(qContext: QueryContext, lp: SeriesKeysByFilters, forceInProcess: Boolean): PlanResult = { - // NOTE: _type_ filter support currently isn't there in series keys queries - val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters)) + val renamedFilters = renameMetricFilter(lp.filters) val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) { shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs) diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index 0f7e72a24a..4131753b6f 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1))) @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) //actor should restart and serve queries again probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala index 01a1252f7c..0ab22c3e0a 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo], result ++= consumer.stringPairs case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i), blobOffset(base, offset, i)) - result += ("_type_" -> + result += (Schemas.TypeLabel -> Schemas.global.schemaName( RecordSchema.schemaID(blobBase(base, offset, i), blobOffset(base, offset, i)))) diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 599c2f51fd..0ecf88fb45 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -174,7 +174,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> + Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } } diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 2cade7d972..84a251ad54 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -36,10 +36,10 @@ import spire.syntax.cfor._ import filodb.core.{concurrentCache, DatasetRef} import filodb.core.Types.PartitionKey -import filodb.core.binaryrecord2.MapItemConsumer +import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema} import filodb.core.memstore.ratelimit.CardinalityTracker +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} -import filodb.core.metadata.PartitionSchema import filodb.core.query.{ColumnFilter, Filter, QueryUtils} import filodb.core.query.Filter._ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort} @@ -677,6 +677,9 @@ class PartKeyLuceneIndex(ref: DatasetRef, // If configured and enabled, Multi-column facets will be created on "partition-schema" columns createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) + addIndexedField(Schemas.TypeLabel, schemaName) + cforRange { 0 until numPartColumns } { i => indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index ab1a73a27f..8b8e3593e0 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -1853,7 +1853,7 @@ class TimeSeriesShard(val ref: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } /** diff --git a/core/src/main/scala/filodb.core/metadata/Schemas.scala b/core/src/main/scala/filodb.core/metadata/Schemas.scala index f586cc5afe..f486f620e1 100644 --- a/core/src/main/scala/filodb.core/metadata/Schemas.scala +++ b/core/src/main/scala/filodb.core/metadata/Schemas.scala @@ -372,6 +372,8 @@ object Schemas extends StrictLogging { import Accumulation._ import Dataset._ + val TypeLabel = "_type_" + val _log = logger val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 663cb2b0cf..f39368270c 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 ) partNums7 should not equal debox.Buffer.empty[Int] + // tests to validate schema ID filter + val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("schemaID:46894".utf8))) + val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end) + partNums8 shouldEqual debox.Buffer(7, 8, 9) + + val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("prom-counter".utf8))) + val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end) + partNums9.length shouldEqual 0 + } it("should fetch part key records from filters correctly") { @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte keyIndex.refreshReadersBlocking() - keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name") + keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name") keyIndex.indexValues("not_found").toSeq should equal (Nil) val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1)) @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue) labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++ (0 until 10).map(c => s"infoLabel$c").toSet ++ - Set("_ns_", "_ws_", "_metric_") + Set("_ns_", "_ws_", "_metric_", "_type_") } it("should be able to fetch label values efficiently using facets") { diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index e5b0072870..c9ca67e4d2 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.latestOffset(dataset1.ref, 0) shouldEqual 0 val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte splits should have length (2) memStore.indexNames(dataset2.ref, 10).toSet should equal ( - Set(("n", 0), ("series", 0), ("n", 1), ("series", 1))) + Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1))) val filter = ColumnFilter("n", Filter.Equals("2".utf8)) val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter))) @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.truncate(dataset1.ref, numShards = 4) diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala index 77c4bf1714..473d135462 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala @@ -3,13 +3,14 @@ package filodb.prometheus.ast import scala.util.Try import filodb.core.{query, GlobalConfig} +import filodb.core.metadata.Schemas import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams} import filodb.prometheus.parse.Parser import filodb.query._ object Vectors { val PromMetricLabel = "__name__" - val TypeLabel = "_type_" + val TypeLabel = Schemas.TypeLabel val BucketFilterLabel = "_bucket_" val conf = GlobalConfig.systemConfig val queryConfig = conf.getConfig("filodb.query") diff --git a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala index 0e03d89eaa..75ff9b3d70 100644 --- a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala +++ b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala @@ -4,8 +4,8 @@ import remote.RemoteStorage._ import filodb.core.GlobalConfig import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer} +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType -import filodb.core.metadata.PartitionSchema import filodb.core.query.{QueryUtils, Result => _, _} import filodb.prometheus.parse.Parser.REGEX_MAX_LEN import filodb.query.{QueryResult => FiloQueryResult, _} @@ -230,7 +230,7 @@ object PrometheusModel { def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = { Map("_shards_" -> rvk.sourceShards.mkString(","), "_partIds_" -> rvk.partIds.mkString(","), - "_type_" -> rvk.schemaNames.mkString(",")) + Schemas.TypeLabel -> rvk.schemaNames.mkString(",")) } def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = { diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index d8aced8cc5..313f5a9ca3 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader import filodb.core.memstore.ratelimit.CardinalityStore import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore} import filodb.core.metadata.Schemas -import filodb.core.query._ +import filodb.core.query.{ColumnFilter, _} import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore} import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String} import filodb.query._ @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B result shouldEqual jobQueryResult1 } + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now-5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + val result = (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + val rv = response(0) + rv.rows.size shouldEqual 1 + val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader] + rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset) + } + } + result shouldEqual jobQueryResult1 + } + + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map { ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now - 5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + response.isEmpty shouldEqual true + } + } + } + it("should not return any rows for wrong column filters") { import ZeroCopyUTF8String._ val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)), @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B } it ("should be able to query labels with filter") { - val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_") + val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_") val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8))) val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B "job" -> "1", "instance" -> "2", "_metric_" -> "1", - "_ws_" -> "1") + "_ws_" -> "1", + "_type_" -> "1") } }