Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(card): Adding config support for DS card flushCount and perf logs for cardinality calculation time #1666

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class CardinalityManager(datasetRef: DatasetRef,
// physical resources for duplicate calculation
private var isCardTriggered: Boolean = false

// number of partKeys to aggregate in memory at any given time, before we write the cardinality information to
// rocksDB. This is done to reduce the time taken to calculate cardinality of partitions with high number of
// TS by reducing the number writes issued to RocksDB ( From our profiling, high number of RocksDB writes lead to
// performance bottlenecks)
// For cardFlushCount = 500,000, we conservatively expect to consume - 128 bytes * 500,000 = ~64 MB
private val cardFlushCount: Option[Int] =
if (filodbConfig.hasPath("card-flush-count")) {
Some(filodbConfig.getInt("card-flush-count"))
} else None

/**
* `dataset-configs` is an string array where each string is a file path for a dataset config. This function reads
Expand Down Expand Up @@ -132,9 +141,6 @@ class CardinalityManager(datasetRef: DatasetRef,

/**
* Triggers cardinalityCount if metering is enabled and the required criteria matches.
* It creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count
* and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch
* it with the new one we created in this call.
* @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of
* shouldTriggerCardinalityCount
*/
Expand All @@ -143,34 +149,7 @@ class CardinalityManager(datasetRef: DatasetRef,
try {
if (shouldTriggerCardinalityCount(shardNum, numShardsPerNode, indexRefreshCount)) {
isCardTriggered = true
val newCardTracker = getNewCardTracker()
var cardCalculationComplete = false
try {
partKeyIndex.calculateCardinality(partSchema, newCardTracker)
cardCalculationComplete = true
} catch {
case ex: Exception =>
logger.error(s"[CardinalityManager]Error while calculating cardinality using" +
s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex)
// cleanup resources used by the newCardTracker tracker to avoid leaking of resources
newCardTracker.close()
}
if (cardCalculationComplete) {
try {
// close the cardinality store and release the physical resources of the current cardinality store
close()
cardTracker = Some(newCardTracker)
logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" +
s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount")
} catch {
case ex: Exception =>
// Very unlikely scenario, but can happen if the disk call fails.
logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex)
// setting cardTracker to None in this case, since the exception happened on the close. We
// can't rely on the card store. The next trigger should re-build the card store and card tracker
cardTracker = None
}
}
createNewCardinalityTrackerAndCalculate(indexRefreshCount)
sandeep6189 marked this conversation as resolved.
Show resolved Hide resolved
isCardTriggered = false
}
else {
Expand All @@ -189,6 +168,50 @@ class CardinalityManager(datasetRef: DatasetRef,
}
}

/**
* Creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count
* and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch
* it with the new one we created in this call.
*
* @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of
* shouldTriggerCardinalityCount
*/
private def createNewCardinalityTrackerAndCalculate(indexRefreshCount: Int): Unit = {
val newCardTracker = getNewCardTracker()
var cardCalculationComplete = false
val startTimeMs = System.currentTimeMillis()
try {
logger.info(s"[CardinalityManager]Triggering cardinality count for shardNum=$shardNum " +
s"indexRefreshCount=$indexRefreshCount")
partKeyIndex.calculateCardinality(partSchema, newCardTracker)
cardCalculationComplete = true
} catch {
case ex: Exception =>
logger.error(s"[CardinalityManager]Error while calculating cardinality using" +
s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex)
// cleanup resources used by the newCardTracker tracker to avoid leaking of resources
newCardTracker.close()
}
if (cardCalculationComplete) {
try {
// close and release the physical resources of the outdated/previous cardinality store
close()
// reassign the cardTracker with the newly created CardinalityTracker object
cardTracker = Some(newCardTracker)
val timeTakenInSeconds = ((System.currentTimeMillis() - startTimeMs)/1000.0)
logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" +
s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount timeTakenInSeconds=$timeTakenInSeconds")
} catch {
case ex: Exception =>
// Very unlikely scenario, but can happen if the disk call fails.
logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex)
// setting cardTracker to None in this case, since the exception happened on the close. We
// can't rely on the card store. The next trigger should re-build the card store and card tracker
cardTracker = None
}
}
}

/**
* Helper method to create a CardinalityTracker instance
*
Expand All @@ -197,7 +220,9 @@ class CardinalityManager(datasetRef: DatasetRef,
private def getNewCardTracker(): CardinalityTracker = {
val cardStore = new RocksDbCardinalityStore(datasetRef, shardNum)
val defaultQuota = quotaSource.getDefaults(datasetRef)
val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore)
logger.info(s"[CardinalityManager] Creating new CardinalityTracker with flushCount=$cardFlushCount")
val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore,
flushCount = cardFlushCount)
quotaSource.getQuotas(datasetRef).foreach { q =>
tracker.setQuota(q.shardKeyPrefix, q.quota)
}
Expand Down
Loading