From a69819f0959f35a2627c79f846250dada1027809 Mon Sep 17 00:00:00 2001 From: Amol Nayak Date: Thu, 1 Feb 2024 14:18:16 -0800 Subject: [PATCH] perf(query) Option to disable Lucene caching --- core/src/main/resources/filodb-defaults.conf | 4 +++ .../memstore/PartKeyLuceneIndex.scala | 24 +++++++++++++-- .../memstore/TimeSeriesShard.scala | 4 ++- core/src/test/resources/application_test.conf | 1 + .../memstore/PartKeyLuceneIndexSpec.scala | 29 +++++++++++++++++++ 5 files changed, 59 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 366077dee2..f9c642ffa1 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -751,6 +751,10 @@ filodb { # If all of the index-faceting-enabled-* properties are false, faceting is fully disabled. # Disable if performance cost of faceting all labels is too high index-faceting-enabled-for-all-labels = true + + # Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us + #disable this feature + disable-index-caching = false } # for standalone worker cluster configuration, see akka-bootstrapper diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index bc2da9a13e..de11765b95 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, retentionMillis: Long, // only used to calculate fallback startTime diskLocation: Option[File] = None, val lifecycleManager: Option[IndexMetadataStore] = None, - useMemoryMappedImpl: Boolean = true + useMemoryMappedImpl: Boolean = true, + disableIndexCaching: Boolean = false ) extends StrictLogging { import PartKeyLuceneIndex._ @@ -240,7 +241,26 @@ class PartKeyLuceneIndex(ref: DatasetRef, private val utf8ToStrCache = concurrentCache[UTF8Str, String](PartKeyLuceneIndex.MAX_STR_INTERN_ENTRIES) //scalastyle:off - private val searcherManager = new SearcherManager(indexWriter, null) + private val searcherManager = + if (disableIndexCaching) { + new SearcherManager(indexWriter, + new SearcherFactory() { + override def newSearcher(reader: IndexReader, previousReader: IndexReader): IndexSearcher = { + val indexSearcher = super.newSearcher(reader, previousReader) + indexSearcher.setQueryCache(null) + indexSearcher.setQueryCachingPolicy(new QueryCachingPolicy() { + override def onUse(query: Query): Unit = { + + } + + override def shouldCache(query: Query): Boolean = false + }) + indexSearcher + } + }) + } else { + new SearcherManager(indexWriter, null) + } //scalastyle:on //start this thread to flush the segments and refresh the searcher every specific time period diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 67c32906dd..86cb037f11 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -284,6 +284,8 @@ class TimeSeriesShard(val ref: DatasetRef, filodbConfig.getBoolean("memstore.index-faceting-enabled-shard-key-labels") private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels") private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism") + private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching") + /////// END CONFIGURATION FIELDS /////////////////// @@ -311,7 +313,7 @@ class TimeSeriesShard(val ref: DatasetRef, */ private[memstore] final val partKeyIndex = new PartKeyLuceneIndex(ref, schemas.part, indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum, - storeConfig.diskTTLSeconds * 1000) + storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching) private val cardTracker: CardinalityTracker = initCardTracker() diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index 4788408410..b89390ba3d 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -106,6 +106,7 @@ filodb { track-queries-holding-eviction-lock = false index-faceting-enabled-shard-key-labels = true index-faceting-enabled-for-all-labels = true + disable-index-caching = false } diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 5438099722..3d17c37038 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -117,6 +117,35 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte result.map( p => (p.startTime, p.endTime)) shouldEqual expected.map( p => (p.startTime, p.endTime)) } + + it("should fetch part key records from filters correctly with index caching disabled") { + // Add the first ten keys and row numbers + val keyIndexNoCache = + new PartKeyLuceneIndex(dataset6.ref, dataset6.schema.partition, + true, + true, + 0, + 1.hour.toMillis, + disableIndexCaching = true) + val pkrs = partKeyFromRecords(dataset6, records(dataset6, readers.take(10)), Some(partBuilder)) + .zipWithIndex.map { case (addr, i) => + val pk = partKeyOnHeap(dataset6.partKeySchema, ZeroPointer, addr) + keyIndexNoCache.addPartKey(pk, i, i, i + 10)() + PartKeyLuceneIndexRecord(pk, i, i + 10) + } + keyIndexNoCache.refreshReadersBlocking() + + val filter2 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + Range(1, 100).foreach(_ => { + val result = keyIndexNoCache.partKeyRecordsFromFilters(Seq(filter2), 0, Long.MaxValue) + val expected = Seq(pkrs(7), pkrs(8), pkrs(9)) + + result.map(_.partKey.toSeq) shouldEqual expected.map(_.partKey.toSeq) + result.map(p => (p.startTime, p.endTime)) shouldEqual expected.map(p => (p.startTime, p.endTime)) + }) + + } + it("should fetch only two part key records from filters") { // Add the first ten keys and row numbers val pkrs = partKeyFromRecords(dataset6, records(dataset6, readers.take(10)), Some(partBuilder))