Skip to content

Commit

Permalink
feat(core) Persistent index checkpointing (#1459)
Browse files Browse the repository at this point in the history
* WIP

* WIP

* PR Review comments
  • Loading branch information
amolnayak311 authored Oct 11, 2022
1 parent 76181f3 commit ef61499
Show file tree
Hide file tree
Showing 8 changed files with 648 additions and 28 deletions.
2 changes: 1 addition & 1 deletion conf/timeseries-durable-downsample-index-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
# Raw schemas from which to downsample
raw-schema-names = [ "gauge", "untyped", "prom-counter", "prom-histogram"]

index-location = "target/tmp/downsample-index"
index-location = "/tmp/downsample-index"
index-metastore-implementation = "file"
}

Expand Down
2 changes: 2 additions & 0 deletions conf/timeseries-filodb-server-ds.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
include "timeseries-filodb-server.conf"
dataset-prometheus = { include required("timeseries-durable-downsample-index-dev-source.conf") }

filodb {
v2-cluster-enabled = false
http.bind-port=9080
store-factory = "filodb.cassandra.DownsampledTSStoreFactory"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ final case class DownsampleConfig(config: Config) {
else Seq.empty
val indexLocation = config.getOrElse[Option[String]]("index-location", None)

val maxRefreshHours = config.getOrElse("max-refresh-hours", Long.MaxValue)

val enablePersistentIndexing = indexLocation.isDefined

val indexMetastoreImplementation = if (config.hasPath("index-metastore-implementation")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,

private val indexMetadataStore : Option[IndexMetadataStore] = {
downsampleConfig.indexMetastoreImplementation match {
case IndexMetastoreImplementation.NoImp => None
case IndexMetastoreImplementation.File => Some(
new FileCheckpointedIndexMetadataStore(downsampleConfig.indexLocation.get)
)
case IndexMetastoreImplementation.Ephemeral => Some(new EphemeralIndexMetadataStore())
case IndexMetastoreImplementation.NoImp => None
case IndexMetastoreImplementation.File =>
Some(new FileSystemBasedIndexMetadataStore(downsampleConfig.indexLocation.get,
FileSystemBasedIndexMetadataStore.expectedVersion(FileSystemBasedIndexMetadataStore.expectedGenerationEnv),
downsampleConfig.maxRefreshHours))
case IndexMetastoreImplementation.Ephemeral => Some(new EphemeralIndexMetadataStore())
}
}

private val partKeyIndex = new PartKeyLuceneIndex(indexDataset, schemas.part, false,

private val partKeyIndex = new PartKeyLuceneIndex(indexDataset, schemas.part, false,
false, shardNum, indexTtlMs,
downsampleConfig.indexLocation.map(new java.io.File(_)),
indexMetadataStore
Expand Down Expand Up @@ -151,8 +153,9 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
def recoverIndex(): Future[Long] = {
if (downsampleConfig.enablePersistentIndexing) {
partKeyIndex.getCurrentIndexState() match {
case (IndexState.Empty, _) =>
logger.info("Found index state empty, bootstrapping downsample index")
case (IndexState.Empty, _) |
(IndexState.TriggerRebuild, _) =>
logger.info("Found index state empty/rebuild triggered, bootstrapping downsample index")
recoverIndexInternal(None)
case (IndexState.Synced, checkpointMillis) =>
logger.warn(s"Found index state synced, bootstrapping downsample index from time(ms) $checkpointMillis")
Expand All @@ -170,29 +173,22 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,

private def recoverIndexInternal(checkpointMillis: Option[Long]): Future[Long] = {
// By passing -1 for partId, numeric partId will not be persisted in the index
// Since we are recovering, always start from last synced time and update till current timestamp.
val endHour = hour()

(checkpointMillis match {
case Some(tsMillis) => // We know we have to refresh only since this hour, it is possible we do not
// find any data for this refresh as the index is already updated
val (startHour, endHour) = (hour(tsMillis), hour())
val startHour = hour(tsMillis) - 1
// do not notify listener as the map operation will be updating the state
indexRefresh(endHour, startHour, notifyListener = false)
indexRefresh(endHour, startHour, periodicRefresh = false)
case None => // No checkpoint time found, start refresh from scratch
indexBootstrapper
.bootstrapIndexDownsample(
partKeyIndex, shardNum, indexDataset, indexTtlMs)
}).map { count =>
logger.info(s"Bootstrapped index for dataset=$indexDataset shard=$shardNum with $count records")
// need to start recovering 6 hours prior to now since last index migration could have run 6 hours ago
// and we'd be missing entries that would be migrated in the last 6 hours.
// Hence indexUpdatedHour should be: currentHour - 6
val indexJobIntervalInHours = (downsampleStoreConfig.maxChunkTime.toMinutes + 59) / 60 // for ceil division
// the checkpoint updated should be at least what we had previously, subsequently, the refresh thread will
// take care of periodically updating this last synced value
val endHour = checkpointMillis.getOrElse(0L).max(hour() - indexJobIntervalInHours - 1)
indexUpdatedHour.set(endHour)
// The time set here in synced state should match the time we set in indexUpdatedHour
// It is possible when we syn from a last timestamp, we dont update any data in index and as such, this
// synced time set is the pessimistic time to ensure no data sync to index is lost
partKeyIndex.notifyLifecycleListener(IndexState.Synced, endHour * 3600 * 1000L)
stats.shardTotalRecoveryTime.update(System.currentTimeMillis() - creationTime)
startHousekeepingTask()
Expand Down Expand Up @@ -252,14 +248,22 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
indexRefresh(toHour, fromHour)
}

def indexRefresh(toHour: Long, fromHour: Long, notifyListener: Boolean = true): Task[Long] = {
/**
*
* @param toHour The end hour of the refresh
* @param fromHour The start hour of the refresh
* @param periodicRefresh boolean flag indicating whether this is a periodic refresh or the one called as part of
* index bootstrap.
* @return
*/
def indexRefresh(toHour: Long, fromHour: Long, periodicRefresh: Boolean = true): Task[Long] = {
indexRefresher.refreshWithDownsamplePartKeys(
partKeyIndex, shardNum, rawDatasetRef, fromHour, toHour, schemas)
.map { count =>
stats.indexEntriesRefreshed.increment(count)
logger.info(s"Refreshed downsample index with new records numRecords=$count " +
s"dataset=$rawDatasetRef shard=$shardNum fromHour=$fromHour toHour=$toHour")
if(notifyListener) {
if(periodicRefresh) {
indexUpdatedHour.set(toHour)
partKeyIndex.notifyLifecycleListener(IndexState.Synced, toHour * 3600 * 1000L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class IndexBootstrapper(colStore: ColumnStore) {
ref: DatasetRef,
ttlMs: Long): Task[Long] = {

val startCheckpoint = System.currentTimeMillis()
val recoverIndexLatency = Kamon.gauge("shard-recover-index-latency", MeasurementUnit.time.milliseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shardNum)
Expand All @@ -80,7 +81,7 @@ class IndexBootstrapper(colStore: ColumnStore) {
// Note that we do not set an end time for the Synced here, instead
// we will do it from DownsampleTimeSeriesShard
index.refreshReadersBlocking()
recoverIndexLatency.update(System.currentTimeMillis() - start)
recoverIndexLatency.update(System.currentTimeMillis() - startCheckpoint)
count
}
}
Expand Down
Loading

0 comments on commit ef61499

Please sign in to comment.