Skip to content

Commit

Permalink
feat(core): Now metadata queries support _type_ filter (#1819)
Browse files Browse the repository at this point in the history
Current behavior :
Metadata queries do not support filtering based on metric type using _type_ filter
New behavior :
Added support for that feature. Note that only new documents of persisted downsample index will have the type field. Index needs to be rebuilt if full support is needed. Can be skipped if metadata queries don't hit downsample index.
  • Loading branch information
vishramachandran authored and sherali42 committed Oct 22, 2024
1 parent 8eaa7af commit 88c3e5d
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset,
private def materializeSeriesKeysByFilters(qContext: QueryContext,
lp: SeriesKeysByFilters,
forceInProcess: Boolean): PlanResult = {
// NOTE: _type_ filter support currently isn't there in series keys queries
val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters))
val renamedFilters = renameMetricFilter(lp.filters)

val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) {
shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew
memStore.refreshIndexForTesting(dataset1.ref)

probe.send(coordinatorActor, GetIndexNames(ref))
probe.expectMsg(Seq("series"))
probe.expectMsg(Seq("series", "_type_"))

probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4))
probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1)))
Expand All @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew
memStore.refreshIndexForTesting(dataset1.ref)

probe.send(coordinatorActor, GetIndexNames(ref))
probe.expectMsg(Seq("series"))
probe.expectMsg(Seq("series", "_type_"))

//actor should restart and serve queries again
probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo],
result ++= consumer.stringPairs
case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i),
blobOffset(base, offset, i))
result += ("_type_" ->
result += (Schemas.TypeLabel ->
Schemas.global.schemaName(
RecordSchema.schemaID(blobBase(base, offset, i),
blobOffset(base, offset, i))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => {
pair._1.utf8 -> pair._2.utf8
}).toMap ++
Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
Map(Schemas.TypeLabel.utf8 ->
Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import spire.syntax.cfor._

import filodb.core.{concurrentCache, DatasetRef}
import filodb.core.Types.PartitionKey
import filodb.core.binaryrecord2.MapItemConsumer
import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema}
import filodb.core.memstore.ratelimit.CardinalityTracker
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.metadata.PartitionSchema
import filodb.core.query.{ColumnFilter, Filter, QueryUtils}
import filodb.core.query.Filter._
import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort}
Expand Down Expand Up @@ -677,6 +677,9 @@ class PartKeyLuceneIndex(ref: DatasetRef,
// If configured and enabled, Multi-column facets will be created on "partition-schema" columns
createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset)

val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset))
addIndexedField(Schemas.TypeLabel, schemaName)

cforRange { 0 until numPartColumns } { i =>
indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,7 @@ class TimeSeriesShard(val ref: DatasetRef,
schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => {
pair._1.utf8 -> pair._2.utf8
}).toMap ++
Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ object Schemas extends StrictLogging {
import Accumulation._
import Dataset._

val TypeLabel = "_type_"

val _log = logger

val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte
val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 )
partNums7 should not equal debox.Buffer.empty[Int]

// tests to validate schema ID filter
val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)),
ColumnFilter("_type_", Equals("schemaID:46894".utf8)))
val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end)
partNums8 shouldEqual debox.Buffer(7, 8, 9)

val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)),
ColumnFilter("_type_", Equals("prom-counter".utf8)))
val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end)
partNums9.length shouldEqual 0

}

it("should fetch part key records from filters correctly") {
Expand Down Expand Up @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte

keyIndex.refreshReadersBlocking()

keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name")
keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name")
keyIndex.indexValues("not_found").toSeq should equal (Nil)

val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1))
Expand Down Expand Up @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte
val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue)
labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++
(0 until 10).map(c => s"infoLabel$c").toSet ++
Set("_ns_", "_ws_", "_metric_")
Set("_ns_", "_ws_", "_metric_", "_type_")
}

it("should be able to fetch label values efficiently using facets") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte

memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref)
memStore.numPartitions(dataset1.ref, 0) shouldEqual 10
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0)))
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0)))
memStore.latestOffset(dataset1.ref, 0) shouldEqual 0

val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet
Expand Down Expand Up @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte
splits should have length (2)

memStore.indexNames(dataset2.ref, 10).toSet should equal (
Set(("n", 0), ("series", 0), ("n", 1), ("series", 1)))
Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1)))

val filter = ColumnFilter("n", Filter.Equals("2".utf8))
val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter)))
Expand Down Expand Up @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte

memStore.refreshIndexForTesting(dataset1.ref)
memStore.numPartitions(dataset1.ref, 0) shouldEqual 10
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0)))
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0)))

memStore.truncate(dataset1.ref, numShards = 4)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package filodb.prometheus.ast
import scala.util.Try

import filodb.core.{query, GlobalConfig}
import filodb.core.metadata.Schemas
import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams}
import filodb.prometheus.parse.Parser
import filodb.query._

object Vectors {
val PromMetricLabel = "__name__"
val TypeLabel = "_type_"
val TypeLabel = Schemas.TypeLabel
val BucketFilterLabel = "_bucket_"
val conf = GlobalConfig.systemConfig
val queryConfig = conf.getConfig("filodb.query")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import remote.RemoteStorage._

import filodb.core.GlobalConfig
import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer}
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType
import filodb.core.metadata.PartitionSchema
import filodb.core.query.{QueryUtils, Result => _, _}
import filodb.prometheus.parse.Parser.REGEX_MAX_LEN
import filodb.query.{QueryResult => FiloQueryResult, _}
Expand Down Expand Up @@ -230,7 +230,7 @@ object PrometheusModel {
def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = {
Map("_shards_" -> rvk.sourceShards.mkString(","),
"_partIds_" -> rvk.partIds.mkString(","),
"_type_" -> rvk.schemaNames.mkString(","))
Schemas.TypeLabel -> rvk.schemaNames.mkString(","))
}

def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = {
Expand Down
53 changes: 50 additions & 3 deletions query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader
import filodb.core.memstore.ratelimit.CardinalityStore
import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore}
import filodb.core.metadata.Schemas
import filodb.core.query._
import filodb.core.query.{ColumnFilter, _}
import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore}
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String}
import filodb.query._
Expand Down Expand Up @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
result shouldEqual jobQueryResult1
}

it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") {
import ZeroCopyUTF8String._
val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)),
ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)),
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard =>
LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref,
ishard, filters, Seq("job", "unicode_tag"), now-5000, now)
}

val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves)

val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue
val result = (resp: @unchecked) match {
case QueryResult(id, _, response, _, _, _, _) => {
val rv = response(0)
rv.rows.size shouldEqual 1
val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader]
rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset)
}
}
result shouldEqual jobQueryResult1
}

it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") {
import ZeroCopyUTF8String._
val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)),
ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)),
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map { ishard =>
LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref,
ishard, filters, Seq("job", "unicode_tag"), now - 5000, now)
}

val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves)

val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue
(resp: @unchecked) match {
case QueryResult(id, _, response, _, _, _, _) => {
response.isEmpty shouldEqual true
}
}
}

it("should not return any rows for wrong column filters") {
import ZeroCopyUTF8String._
val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)),
Expand Down Expand Up @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
}

it ("should be able to query labels with filter") {
val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_")
val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_")
val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard =>
Expand Down Expand Up @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
"job" -> "1",
"instance" -> "2",
"_metric_" -> "1",
"_ws_" -> "1")
"_ws_" -> "1",
"_type_" -> "1")
}
}

Expand Down

0 comments on commit 88c3e5d

Please sign in to comment.