Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick: support for _type_ filter in metadata queries #1872

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset,
private def materializeSeriesKeysByFilters(qContext: QueryContext,
lp: SeriesKeysByFilters,
forceInProcess: Boolean): PlanResult = {
// NOTE: _type_ filter support currently isn't there in series keys queries
val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters))
val renamedFilters = renameMetricFilter(lp.filters)

val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) {
shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew
memStore.refreshIndexForTesting(dataset1.ref)

probe.send(coordinatorActor, GetIndexNames(ref))
probe.expectMsg(Seq("series"))
probe.expectMsg(Seq("series", "_type_"))

probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4))
probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1)))
Expand All @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew
memStore.refreshIndexForTesting(dataset1.ref)

probe.send(coordinatorActor, GetIndexNames(ref))
probe.expectMsg(Seq("series"))
probe.expectMsg(Seq("series", "_type_"))

//actor should restart and serve queries again
probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4))
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,9 @@ filodb {
# Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us
#disable this feature
disable-index-caching = false

# Whether to add the _type_ label to all time series for the purpose of filtering
type-field-indexing-enabled = false
}

# for standalone worker cluster configuration, see akka-bootstrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo],
result ++= consumer.stringPairs
case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i),
blobOffset(base, offset, i))
result += ("_type_" ->
result += (Schemas.TypeLabel ->
Schemas.global.schemaName(
RecordSchema.schemaID(blobBase(base, offset, i),
blobOffset(base, offset, i))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
private val deploymentPartitionName = filodbConfig.getString("deployment-partition-name")

private val downsampleStoreConfig = StoreConfig(filodbConfig.getConfig("downsampler.downsample-store-config"))
private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled")

private val stats = new DownsampledTimeSeriesShardStats(rawDatasetRef, shardNum)

Expand All @@ -102,7 +103,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
private val partKeyIndex: PartKeyIndexDownsampled = new PartKeyLuceneIndex(indexDataset, schemas.part, false,
false, shardNum, indexTtlMs,
downsampleConfig.indexLocation.map(new java.io.File(_)),
indexMetadataStore
indexMetadataStore, addMetricTypeField = typeFieldIndexingEnabled
)

private val indexUpdatedHour = new AtomicLong(0)
Expand Down Expand Up @@ -174,7 +175,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => {
pair._1.utf8 -> pair._2.utf8
}).toMap ++
Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
Map(Schemas.TypeLabel.utf8 ->
Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
}
}

Expand Down
37 changes: 28 additions & 9 deletions core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import spire.syntax.cfor._

import filodb.core.{concurrentCache, DatasetRef}
import filodb.core.Types.PartitionKey
import filodb.core.binaryrecord2.MapItemConsumer
import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema}
import filodb.core.memstore.ratelimit.CardinalityTracker
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.metadata.PartitionSchema
import filodb.core.query.{ColumnFilter, Filter, QueryUtils}
import filodb.core.query.Filter._
import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort}
Expand Down Expand Up @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
diskLocation: Option[File] = None,
val lifecycleManager: Option[IndexMetadataStore] = None,
useMemoryMappedImpl: Boolean = true,
disableIndexCaching: Boolean = false
disableIndexCaching: Boolean = false,
addMetricTypeField: Boolean = true
) extends StrictLogging with PartKeyIndexDownsampled {

import PartKeyLuceneIndex._
Expand Down Expand Up @@ -392,8 +393,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
val value = new String(valueBase.asInstanceOf[Array[Byte]],
unsafeOffsetToBytesRefOffset(valueOffset + 2), // add 2 to move past numBytes
UTF8StringMedium.numBytes(valueBase, valueOffset), StandardCharsets.UTF_8)
addIndexedField(key, value)
}
addIndexedField(key, value, clientData = true)
}
}

/**
Expand All @@ -409,7 +410,7 @@ class PartKeyLuceneIndex(ref: DatasetRef,
val numBytes = schema.binSchema.blobNumBytes(base, offset, pos)
val value = new String(base.asInstanceOf[Array[Byte]], strOffset.toInt - UnsafeUtils.arayOffset,
numBytes, StandardCharsets.UTF_8)
addIndexedField(colName.toString, value)
addIndexedField(colName.toString, value, clientData = true)
}
def getNamesValues(key: PartitionKey): Seq[(UTF8Str, UTF8Str)] = ??? // not used
}
Expand All @@ -425,7 +426,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
}
}.toArray


def getCurrentIndexState(): (IndexState.Value, Option[Long]) =
lifecycleManager.map(_.currentState(this.ref, this.shardNum)).getOrElse((IndexState.Empty, None))

Expand Down Expand Up @@ -619,8 +619,23 @@ class PartKeyLuceneIndex(ref: DatasetRef,
ret
}

private def addIndexedField(labelName: String, value: String): Unit = {
luceneDocument.get().addField(labelName, value)
/**
*
* @param clientData pass true if the field data has come from metric source, and false if internally setting the
* field data. This is used to determine if the type field data should be indexed or not.
*/
private def addIndexedField(labelName: String, value: String, clientData: Boolean): Unit = {
if (clientData && addMetricTypeField) {
// do not index any existing _type_ tag since this is reserved and should not be passed in by clients
if (labelName != Schemas.TypeLabel)
luceneDocument.get().addField(labelName, value)
else
logger.warn("Map column with name '_type_' is a reserved label. Not indexing it.")
// I would have liked to log the entire PK to debug, but it is not accessible from here.
// Ignoring for now, since the plan of record is to drop reserved labels at ingestion gateway.
} else {
luceneDocument.get().addField(labelName, value)
}
}

def addPartKey(partKeyOnHeapBytes: Array[Byte],
Expand Down Expand Up @@ -677,6 +692,10 @@ class PartKeyLuceneIndex(ref: DatasetRef,
// If configured and enabled, Multi-column facets will be created on "partition-schema" columns
createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset)

val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset))
if (addMetricTypeField)
addIndexedField(Schemas.TypeLabel, schemaName, clientData = false)

cforRange { 0 until numPartColumns } { i =>
indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class TimeSeriesShard(val ref: DatasetRef,
private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels")
private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism")
private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching")
private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled")


/////// END CONFIGURATION FIELDS ///////////////////
Expand Down Expand Up @@ -313,7 +314,8 @@ class TimeSeriesShard(val ref: DatasetRef,
*/
private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching,
addMetricTypeField = typeFieldIndexingEnabled)

private val cardTracker: CardinalityTracker = initCardTracker()

Expand Down Expand Up @@ -1853,7 +1855,7 @@ class TimeSeriesShard(val ref: DatasetRef,
schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => {
pair._1.utf8 -> pair._2.utf8
}).toMap ++
Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ object Schemas extends StrictLogging {
import Accumulation._
import Dataset._

val TypeLabel = "_type_"

val _log = logger

val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ filodb {
index-faceting-enabled-shard-key-labels = true
index-faceting-enabled-for-all-labels = true
disable-index-caching = false

type-field-indexing-enabled = true
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte
val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 )
partNums7 should not equal debox.Buffer.empty[Int]

// tests to validate schema ID filter
val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)),
ColumnFilter("_type_", Equals("schemaID:46894".utf8)))
val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end)
partNums8 shouldEqual debox.Buffer(7, 8, 9)

val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)),
ColumnFilter("_type_", Equals("prom-counter".utf8)))
val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end)
partNums9.length shouldEqual 0

}

it("should fetch part key records from filters correctly") {
Expand Down Expand Up @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte

keyIndex.refreshReadersBlocking()

keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name")
keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name")
keyIndex.indexValues("not_found").toSeq should equal (Nil)

val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1))
Expand Down Expand Up @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte
val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue)
labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++
(0 until 10).map(c => s"infoLabel$c").toSet ++
Set("_ns_", "_ws_", "_metric_")
Set("_ns_", "_ws_", "_metric_", "_type_")
}

it("should be able to fetch label values efficiently using facets") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte

memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref)
memStore.numPartitions(dataset1.ref, 0) shouldEqual 10
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0)))
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0)))
memStore.latestOffset(dataset1.ref, 0) shouldEqual 0

val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet
Expand Down Expand Up @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte
splits should have length (2)

memStore.indexNames(dataset2.ref, 10).toSet should equal (
Set(("n", 0), ("series", 0), ("n", 1), ("series", 1)))
Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1)))

val filter = ColumnFilter("n", Filter.Equals("2".utf8))
val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter)))
Expand Down Expand Up @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte

memStore.refreshIndexForTesting(dataset1.ref)
memStore.numPartitions(dataset1.ref, 0) shouldEqual 10
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0)))
memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0)))

memStore.truncate(dataset1.ref, numShards = 4)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package filodb.prometheus.ast
import scala.util.Try

import filodb.core.{query, GlobalConfig}
import filodb.core.metadata.Schemas
import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams}
import filodb.prometheus.parse.Parser
import filodb.query._

object Vectors {
val PromMetricLabel = "__name__"
val TypeLabel = "_type_"
val TypeLabel = Schemas.TypeLabel
val BucketFilterLabel = "_bucket_"
val conf = GlobalConfig.systemConfig
val queryConfig = conf.getConfig("filodb.query")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import remote.RemoteStorage._

import filodb.core.GlobalConfig
import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer}
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType
import filodb.core.metadata.PartitionSchema
import filodb.core.query.{QueryUtils, Result => _, _}
import filodb.prometheus.parse.Parser.REGEX_MAX_LEN
import filodb.query.{QueryResult => FiloQueryResult, _}
Expand Down Expand Up @@ -230,7 +230,7 @@ object PrometheusModel {
def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = {
Map("_shards_" -> rvk.sourceShards.mkString(","),
"_partIds_" -> rvk.partIds.mkString(","),
"_type_" -> rvk.schemaNames.mkString(","))
Schemas.TypeLabel -> rvk.schemaNames.mkString(","))
}

def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = {
Expand Down
53 changes: 50 additions & 3 deletions query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader
import filodb.core.memstore.ratelimit.CardinalityStore
import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore}
import filodb.core.metadata.Schemas
import filodb.core.query._
import filodb.core.query.{ColumnFilter, _}
import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore}
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String}
import filodb.query._
Expand Down Expand Up @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
result shouldEqual jobQueryResult1
}

it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") {
import ZeroCopyUTF8String._
val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)),
ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)),
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard =>
LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref,
ishard, filters, Seq("job", "unicode_tag"), now-5000, now)
}

val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves)

val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue
val result = (resp: @unchecked) match {
case QueryResult(id, _, response, _, _, _, _) => {
val rv = response(0)
rv.rows.size shouldEqual 1
val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader]
rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset)
}
}
result shouldEqual jobQueryResult1
}

it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") {
import ZeroCopyUTF8String._
val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)),
ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)),
ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map { ishard =>
LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref,
ishard, filters, Seq("job", "unicode_tag"), now - 5000, now)
}

val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves)

val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue
(resp: @unchecked) match {
case QueryResult(id, _, response, _, _, _, _) => {
response.isEmpty shouldEqual true
}
}
}

it("should not return any rows for wrong column filters") {
import ZeroCopyUTF8String._
val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)),
Expand Down Expand Up @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
}

it ("should be able to query labels with filter") {
val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_")
val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_")
val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8)))

val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard =>
Expand Down Expand Up @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B
"job" -> "1",
"instance" -> "2",
"_metric_" -> "1",
"_ws_" -> "1")
"_ws_" -> "1",
"_type_" -> "1")
}
}

Expand Down
Loading