Skip to content

Commit

Permalink
perf(query) Option to disable Lucene caching (#1709)
Browse files Browse the repository at this point in the history
  • Loading branch information
amolnayak311 authored Feb 2, 2024
1 parent 63e7dfb commit 618eae0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 3 deletions.
4 changes: 4 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@ filodb {
# If all of the index-faceting-enabled-* properties are false, faceting is fully disabled.
# Disable if performance cost of faceting all labels is too high
index-faceting-enabled-for-all-labels = true

# Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us
#disable this feature
disable-index-caching = false
}

# for standalone worker cluster configuration, see akka-bootstrapper
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
retentionMillis: Long, // only used to calculate fallback startTime
diskLocation: Option[File] = None,
val lifecycleManager: Option[IndexMetadataStore] = None,
useMemoryMappedImpl: Boolean = true
useMemoryMappedImpl: Boolean = true,
disableIndexCaching: Boolean = false
) extends StrictLogging {

import PartKeyLuceneIndex._
Expand Down Expand Up @@ -240,7 +241,26 @@ class PartKeyLuceneIndex(ref: DatasetRef,
private val utf8ToStrCache = concurrentCache[UTF8Str, String](PartKeyLuceneIndex.MAX_STR_INTERN_ENTRIES)

//scalastyle:off
private val searcherManager = new SearcherManager(indexWriter, null)
private val searcherManager =
if (disableIndexCaching) {
new SearcherManager(indexWriter,
new SearcherFactory() {
override def newSearcher(reader: IndexReader, previousReader: IndexReader): IndexSearcher = {
val indexSearcher = super.newSearcher(reader, previousReader)
indexSearcher.setQueryCache(null)
indexSearcher.setQueryCachingPolicy(new QueryCachingPolicy() {
override def onUse(query: Query): Unit = {

}

override def shouldCache(query: Query): Boolean = false
})
indexSearcher
}
})
} else {
new SearcherManager(indexWriter, null)
}
//scalastyle:on

//start this thread to flush the segments and refresh the searcher every specific time period
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ class TimeSeriesShard(val ref: DatasetRef,
filodbConfig.getBoolean("memstore.index-faceting-enabled-shard-key-labels")
private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels")
private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism")
private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching")


/////// END CONFIGURATION FIELDS ///////////////////

Expand Down Expand Up @@ -311,7 +313,7 @@ class TimeSeriesShard(val ref: DatasetRef,
*/
private[memstore] final val partKeyIndex = new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000)
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)

private val cardTracker: CardinalityTracker = initCardTracker()

Expand Down
1 change: 1 addition & 0 deletions core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ filodb {
track-queries-holding-eviction-lock = false
index-faceting-enabled-shard-key-labels = true
index-faceting-enabled-for-all-labels = true
disable-index-caching = false

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,35 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte
result.map( p => (p.startTime, p.endTime)) shouldEqual expected.map( p => (p.startTime, p.endTime))
}


it("should fetch part key records from filters correctly with index caching disabled") {
// Add the first ten keys and row numbers
val keyIndexNoCache =
new PartKeyLuceneIndex(dataset6.ref, dataset6.schema.partition,
true,
true,
0,
1.hour.toMillis,
disableIndexCaching = true)
val pkrs = partKeyFromRecords(dataset6, records(dataset6, readers.take(10)), Some(partBuilder))
.zipWithIndex.map { case (addr, i) =>
val pk = partKeyOnHeap(dataset6.partKeySchema, ZeroPointer, addr)
keyIndexNoCache.addPartKey(pk, i, i, i + 10)()
PartKeyLuceneIndexRecord(pk, i, i + 10)
}
keyIndexNoCache.refreshReadersBlocking()

val filter2 = ColumnFilter("Actor2Code", Equals("GOV".utf8))
Range(1, 100).foreach(_ => {
val result = keyIndexNoCache.partKeyRecordsFromFilters(Seq(filter2), 0, Long.MaxValue)
val expected = Seq(pkrs(7), pkrs(8), pkrs(9))

result.map(_.partKey.toSeq) shouldEqual expected.map(_.partKey.toSeq)
result.map(p => (p.startTime, p.endTime)) shouldEqual expected.map(p => (p.startTime, p.endTime))
})

}

it("should fetch only two part key records from filters") {
// Add the first ten keys and row numbers
val pkrs = partKeyFromRecords(dataset6, records(dataset6, readers.take(10)), Some(partBuilder))
Expand Down

0 comments on commit 618eae0

Please sign in to comment.