From ef614991858042bd627c4b9dfe9a5beda9b39b42 Mon Sep 17 00:00:00 2001 From: Amol Nayak Date: Tue, 11 Oct 2022 15:19:00 -0700 Subject: [PATCH] feat(core) Persistent index checkpointing (#1459) * WIP * WIP * PR Review comments --- ...s-durable-downsample-index-dev-source.conf | 2 +- conf/timeseries-filodb-server-ds.conf | 2 + .../downsample/DownsampleConfig.scala | 2 + .../DownsampledTimeSeriesShard.scala | 48 +-- .../memstore/IndexBootstrapper.scala | 3 +- .../memstore/IndexMetadataStore.scala | 266 ++++++++++++- .../memstore/PartKeyLuceneIndex.scala | 2 +- ...ileSystemBasedIndexMetadataStoreSpec.scala | 351 ++++++++++++++++++ 8 files changed, 648 insertions(+), 28 deletions(-) create mode 100644 core/src/test/scala/filodb.core/memstore/FileSystemBasedIndexMetadataStoreSpec.scala diff --git a/conf/timeseries-durable-downsample-index-dev-source.conf b/conf/timeseries-durable-downsample-index-dev-source.conf index b25872b806..3349f4d602 100644 --- a/conf/timeseries-durable-downsample-index-dev-source.conf +++ b/conf/timeseries-durable-downsample-index-dev-source.conf @@ -109,7 +109,7 @@ # Raw schemas from which to downsample raw-schema-names = [ "gauge", "untyped", "prom-counter", "prom-histogram"] - index-location = "target/tmp/downsample-index" + index-location = "/tmp/downsample-index" index-metastore-implementation = "file" } diff --git a/conf/timeseries-filodb-server-ds.conf b/conf/timeseries-filodb-server-ds.conf index f93f409272..80deb7be89 100644 --- a/conf/timeseries-filodb-server-ds.conf +++ b/conf/timeseries-filodb-server-ds.conf @@ -1,6 +1,8 @@ include "timeseries-filodb-server.conf" +dataset-prometheus = { include required("timeseries-durable-downsample-index-dev-source.conf") } filodb { + v2-cluster-enabled = false http.bind-port=9080 store-factory = "filodb.cassandra.DownsampledTSStoreFactory" } diff --git a/core/src/main/scala/filodb.core/downsample/DownsampleConfig.scala b/core/src/main/scala/filodb.core/downsample/DownsampleConfig.scala index 4457d2dd52..4f5cea9d03 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampleConfig.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampleConfig.scala @@ -34,6 +34,8 @@ final case class DownsampleConfig(config: Config) { else Seq.empty val indexLocation = config.getOrElse[Option[String]]("index-location", None) + val maxRefreshHours = config.getOrElse("max-refresh-hours", Long.MaxValue) + val enablePersistentIndexing = indexLocation.isDefined val indexMetastoreImplementation = if (config.hasPath("index-metastore-implementation")) { diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 9f7a702179..6ddb91c610 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -73,15 +73,17 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private val indexMetadataStore : Option[IndexMetadataStore] = { downsampleConfig.indexMetastoreImplementation match { - case IndexMetastoreImplementation.NoImp => None - case IndexMetastoreImplementation.File => Some( - new FileCheckpointedIndexMetadataStore(downsampleConfig.indexLocation.get) - ) - case IndexMetastoreImplementation.Ephemeral => Some(new EphemeralIndexMetadataStore()) + case IndexMetastoreImplementation.NoImp => None + case IndexMetastoreImplementation.File => + Some(new FileSystemBasedIndexMetadataStore(downsampleConfig.indexLocation.get, + FileSystemBasedIndexMetadataStore.expectedVersion(FileSystemBasedIndexMetadataStore.expectedGenerationEnv), + downsampleConfig.maxRefreshHours)) + case IndexMetastoreImplementation.Ephemeral => Some(new EphemeralIndexMetadataStore()) } } - private val partKeyIndex = new PartKeyLuceneIndex(indexDataset, schemas.part, false, + +private val partKeyIndex = new PartKeyLuceneIndex(indexDataset, schemas.part, false, false, shardNum, indexTtlMs, downsampleConfig.indexLocation.map(new java.io.File(_)), indexMetadataStore @@ -151,8 +153,9 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, def recoverIndex(): Future[Long] = { if (downsampleConfig.enablePersistentIndexing) { partKeyIndex.getCurrentIndexState() match { - case (IndexState.Empty, _) => - logger.info("Found index state empty, bootstrapping downsample index") + case (IndexState.Empty, _) | + (IndexState.TriggerRebuild, _) => + logger.info("Found index state empty/rebuild triggered, bootstrapping downsample index") recoverIndexInternal(None) case (IndexState.Synced, checkpointMillis) => logger.warn(s"Found index state synced, bootstrapping downsample index from time(ms) $checkpointMillis") @@ -170,29 +173,22 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private def recoverIndexInternal(checkpointMillis: Option[Long]): Future[Long] = { // By passing -1 for partId, numeric partId will not be persisted in the index + // Since we are recovering, always start from last synced time and update till current timestamp. + val endHour = hour() + (checkpointMillis match { case Some(tsMillis) => // We know we have to refresh only since this hour, it is possible we do not // find any data for this refresh as the index is already updated - val (startHour, endHour) = (hour(tsMillis), hour()) + val startHour = hour(tsMillis) - 1 // do not notify listener as the map operation will be updating the state - indexRefresh(endHour, startHour, notifyListener = false) + indexRefresh(endHour, startHour, periodicRefresh = false) case None => // No checkpoint time found, start refresh from scratch indexBootstrapper .bootstrapIndexDownsample( partKeyIndex, shardNum, indexDataset, indexTtlMs) }).map { count => logger.info(s"Bootstrapped index for dataset=$indexDataset shard=$shardNum with $count records") - // need to start recovering 6 hours prior to now since last index migration could have run 6 hours ago - // and we'd be missing entries that would be migrated in the last 6 hours. - // Hence indexUpdatedHour should be: currentHour - 6 - val indexJobIntervalInHours = (downsampleStoreConfig.maxChunkTime.toMinutes + 59) / 60 // for ceil division - // the checkpoint updated should be at least what we had previously, subsequently, the refresh thread will - // take care of periodically updating this last synced value - val endHour = checkpointMillis.getOrElse(0L).max(hour() - indexJobIntervalInHours - 1) indexUpdatedHour.set(endHour) - // The time set here in synced state should match the time we set in indexUpdatedHour - // It is possible when we syn from a last timestamp, we dont update any data in index and as such, this - // synced time set is the pessimistic time to ensure no data sync to index is lost partKeyIndex.notifyLifecycleListener(IndexState.Synced, endHour * 3600 * 1000L) stats.shardTotalRecoveryTime.update(System.currentTimeMillis() - creationTime) startHousekeepingTask() @@ -252,14 +248,22 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, indexRefresh(toHour, fromHour) } - def indexRefresh(toHour: Long, fromHour: Long, notifyListener: Boolean = true): Task[Long] = { + /** + * + * @param toHour The end hour of the refresh + * @param fromHour The start hour of the refresh + * @param periodicRefresh boolean flag indicating whether this is a periodic refresh or the one called as part of + * index bootstrap. + * @return + */ + def indexRefresh(toHour: Long, fromHour: Long, periodicRefresh: Boolean = true): Task[Long] = { indexRefresher.refreshWithDownsamplePartKeys( partKeyIndex, shardNum, rawDatasetRef, fromHour, toHour, schemas) .map { count => stats.indexEntriesRefreshed.increment(count) logger.info(s"Refreshed downsample index with new records numRecords=$count " + s"dataset=$rawDatasetRef shard=$shardNum fromHour=$fromHour toHour=$toHour") - if(notifyListener) { + if(periodicRefresh) { indexUpdatedHour.set(toHour) partKeyIndex.notifyLifecycleListener(IndexState.Synced, toHour * 3600 * 1000L) } diff --git a/core/src/main/scala/filodb.core/memstore/IndexBootstrapper.scala b/core/src/main/scala/filodb.core/memstore/IndexBootstrapper.scala index 2ed97d8a0a..132ff8fa32 100644 --- a/core/src/main/scala/filodb.core/memstore/IndexBootstrapper.scala +++ b/core/src/main/scala/filodb.core/memstore/IndexBootstrapper.scala @@ -59,6 +59,7 @@ class IndexBootstrapper(colStore: ColumnStore) { ref: DatasetRef, ttlMs: Long): Task[Long] = { + val startCheckpoint = System.currentTimeMillis() val recoverIndexLatency = Kamon.gauge("shard-recover-index-latency", MeasurementUnit.time.milliseconds) .withTag("dataset", ref.dataset) .withTag("shard", shardNum) @@ -80,7 +81,7 @@ class IndexBootstrapper(colStore: ColumnStore) { // Note that we do not set an end time for the Synced here, instead // we will do it from DownsampleTimeSeriesShard index.refreshReadersBlocking() - recoverIndexLatency.update(System.currentTimeMillis() - start) + recoverIndexLatency.update(System.currentTimeMillis() - startCheckpoint) count } } diff --git a/core/src/main/scala/filodb.core/memstore/IndexMetadataStore.scala b/core/src/main/scala/filodb.core/memstore/IndexMetadataStore.scala index f84c6ab700..ca6d2d1460 100644 --- a/core/src/main/scala/filodb.core/memstore/IndexMetadataStore.scala +++ b/core/src/main/scala/filodb.core/memstore/IndexMetadataStore.scala @@ -1,10 +1,15 @@ package filodb.core.memstore -import java.io.File - +import com.typesafe.scalalogging.StrictLogging +import java.io.{File, FileInputStream, FileOutputStream} +import java.nio.ByteBuffer import scala.collection.mutable.Map +import scala.util.{Failure, Success, Try, Using} import filodb.core.DatasetRef +import filodb.core.memstore.FileSystemBasedIndexMetadataStore.{genFileV1Magic, snapFileV1Magic} + + @@ -31,7 +36,7 @@ import filodb.core.DatasetRef * */ object IndexState extends Enumeration { - val Empty, Rebuilding, Refreshing, Synced, TriggerRebuild = Value + val Empty, Refreshing, Synced, TriggerRebuild = Value } @@ -86,6 +91,261 @@ trait IndexMetadataStore { } +object FileSystemBasedIndexMetadataStore extends StrictLogging { + val genFileV1Magic = 0x5b + val snapFileV1Magic = 0x5c + val expectedGenerationEnv = "INDEX_GENERATION" + + def expectedVersion(expectedVersion: String): Option[Int] = { + if (expectedVersion != null) { + logger.info("Expected version for the index from env is {}", expectedVersion) + Try(Integer.parseInt(expectedVersion)) match { + case Success(parsedVersion) => Some(parsedVersion) + case Failure(e) => logger.warn("Failed while parsing index generation", e) + None + } + } else None + } +} +/** + * File system based metadata store that uses filesystem for storing the metadata. For rebuilds, current implementation + * supports a rebuild of entire DS cluster and not just targeted shards for simplicity. + * + * @param rootDirectory + * @param expectedGeneration: The expected generation of the metadata. If None is found, no rebuild will be triggered + * The checks for generation will run only when a valid integer generation is passed + * @param maxRefreshHours: Refreshing for a large duration is not efficient and it will be faster to rebuild the + * index from scratch, if a snapFile and the elapsed time between now and and lastSync + * time is greater than maxRefreshHours, initState will respond with TriggerRebuild + */ +class FileSystemBasedIndexMetadataStore(rootDirectory: String, expectedGeneration: Option[Int], + maxRefreshHours: Long = Long.MaxValue) + extends IndexMetadataStore with StrictLogging { + + val genFilePathFormat = rootDirectory + File.separator + "_%s_%d_rebuild.gen" + val snapFilePathFormat = rootDirectory + File.separator + "_%s_%d_snap" + + private def hour(millis: Long = System.currentTimeMillis()): Long = millis / 1000 / 60 / 60 + + /** + * Init state is only queried when index is instantiated, the filesystem based implementation uses the file + * //___rebuild.gen file for triggering the rebuild, following cases will decide + * if a rebuild is triggered + * a. If no expectedVersion is provided, then simply call currentState + * b. If the file is absent, the state is assumed to be rebuild and the index will be rebuilt. + * c. If the file is present and the contents are garbled, its assumed to be a rebuild of index. + * d. If the file is present and the content returns the current generation (a numeric version), + * the value is compared to an expected environment value and if the environment variable has a generation, + * greater than the one in the file, a rebuild is triggered. + * e. If sync file is present and the time in sync file is more than maxRefreshHours in the past than now, + * a rebuild will be triggered + * f. If none of the above cases trigger a rebuild, the currentState method is used to determine the state + * @param datasetRef The dataset ref + * @param shard the shard id of index + * @return a tuple of state and the option of time at which the state was recorded + */ + override def initState(datasetRef: DatasetRef, shard: Int): (IndexState.Value, Option[Long]) = { + val snapFile = new File(snapFilePathFormat.format(datasetRef.dataset, shard)) + expectedGeneration.map(expectedGen => { + // TODO: Simplify this code too many if else conditions. + val genFile = new File(genFilePathFormat.format(datasetRef.dataset, shard)) + if (!genFile.exists()) { + logger.info("Gen file {} does not exist and expectedGen is {}, triggering rebuild", + genFile.toString, expectedGen) + (IndexState.TriggerRebuild, None) + } else { + Using(new FileInputStream(genFile)) { fin => + val magicByte = fin.read() + if (magicByte != FileSystemBasedIndexMetadataStore.genFileV1Magic) { + // if first byte is not the expected magicByte then the file is not as expected and possibly garbled + logger.warn("Gen file {} exist and possibly corrupt, triggering rebuild", genFile.toString) + (IndexState.TriggerRebuild, None) + } else { + // If the first byte is magicByte read just the next 4 bytes. This ensures we just read what we need + val bytes = Array[Byte](0, 0, 0, 0) + if (fin.read(bytes) != 4) { + // Not able to read 4 bytes, the file is truncated and we will trigger a rebuild + logger.warn("Gen file {} with right magic bytes but less than 4 bytes for generation," + + " triggering rebuild", genFile.toString) + (IndexState.TriggerRebuild, None) + } else { + val generation = ByteBuffer.wrap(bytes).getInt() + if (expectedGen > generation) { + logger.info("Gen file {} has generation {} and expected generation is {}, triggering rebuild", + genFile.toString, generation, expectedGen) + (IndexState.TriggerRebuild, None) + } else if (shouldRebuildIndexForOldSnap(datasetRef, shard, snapFile)) { + logger.info("lastSnapTime is more than {} hours from now, " + + "for dataset={} and shard={}. Triggering index rebuild instead of syncing the delta", + maxRefreshHours, datasetRef.toString, shard) + (IndexState.TriggerRebuild, None) + } else + currentState(datasetRef, shard) + } + } + }.get + } + }).getOrElse( + if (shouldRebuildIndexForOldSnap(datasetRef, shard, snapFile)) + (IndexState.TriggerRebuild, None) + else + currentState(datasetRef, shard) + ) + } + + /** + * Determines if the rebuild of index should be triggered based on how old the snap is in snap file + * @param datasetRef + * @param shard + * @param snapFile + * @return + */ + private def shouldRebuildIndexForOldSnap(datasetRef: DatasetRef, shard: Int, snapFile: File): Boolean= + if (snapFile.exists()) { + Using(new FileInputStream(snapFile)) { fin => + getSnapTime(snapFile.toString, fin) match { + case Success(snapTime) => hour() - hour(snapTime) > maxRefreshHours + case Failure(_) => true + } + }.get + } else + false + +/** + * The //___snap file contains the last time the index for the given dataset and shard + * was synched. Current implementation only supports storing Synced state and no state related information is stored + * in the snap file. The file contains the magic byte followed by 8 bytes for the long value for the timestamp + * starting with most to least significant bytes + * + * In case the snap file does not exists, magic is not as expected or the file is corrupt, RebuildIndex will be + * triggered + * + * @param datasetRef The dataset ref + * @param shard the shard id of index + * @return a tuple of state and the option of time at which the state was recorded + */ + + override def currentState(datasetRef: DatasetRef, shard: Int): (IndexState.Value, Option[Long]) = { + val snapFile = new File(snapFilePathFormat.format(datasetRef.dataset, shard)) + if (!snapFile.exists()) { + logger.info("Snap file {} does not exist , triggering rebuild", snapFile.toString) + (IndexState.Empty, None) + } else { + Using(new FileInputStream(snapFile)) { fin => + getSnapTime(snapFile.toString, fin) match { + case Success(snapTime) => + logger.info("Found lastSyncTime={} for dataset={} and shard={}", + snapTime, datasetRef.toString, shard) + (IndexState.Synced, Some(snapTime)) + case Failure(e) => throw e // Failure to read the snap file in currentState is fatal + } + }.get + } + } + + /** + * Gets the snapTime from the snap file + * + * @param snapFile the snapFile + * @param fin The input stream object for the file + * @return Long snap time if file is valid + */ + private def getSnapTime(snapFile: String, fin: FileInputStream): Try[Long] = { + val magicByte = fin.read() + if (magicByte != FileSystemBasedIndexMetadataStore.snapFileV1Magic) { + // if first byte is not the expected magicByte then the file is not as expected and possibly garbled + logger.warn(" Snap {} exist and possibly corrupt, this will trigger a rebuild", snapFile.toString) + Failure(new IllegalStateException("Invalid magic bytes in snap file")) + } else { + // If the first byte is magicByte read just the next 8 bytes + val bytes = Array[Byte](0, 0, 0, 0, 0, 0, 0, 0) + if (fin.read(bytes) != 8) { + // Not able to read 8 bytes, the file is truncated and we will trigger a rebuild + logger.warn("Snap file {} with right magic bytes but less than 8 for time," + + " this will trigger a rebuild of index", snapFile.toString) + Failure(new IllegalStateException("Snap file truncated")) + } else { + val lastSyncTime = ByteBuffer.wrap(bytes).getLong() + Success(lastSyncTime) + } + } + } + /** + * Updates the state of the index, the snap file will be updated with the correct timestamp in case the state + * is Synced, on TriggerRebuild and Empty state the gen and/or snap file will be deleted. Also when the index is + * updated and synced, it is important to ensure the gen file is updated with the current generation. This guarantees + * that next time on the bootstrap, the initState should return Synced and not TriggerRebuild + * + * @param datasetRef The dataset ref + * @param shard the shard id of index + * @param state One of the IndexState.Values for the index + * @param time a time in millis since epoch for when the state was updated + */ + // scalastyle:off method.length + override def updateState(datasetRef: DatasetRef, shard: Int, state: IndexState.Value, time: Long): Unit = { + state match { + case IndexState.Synced => // Only handle cases where state is Synced + val snapFile = new File(snapFilePathFormat.format(datasetRef.dataset, shard)) + val genFile = new File(genFilePathFormat.format(datasetRef.dataset, shard)) + Using(new FileOutputStream(snapFile)){ + // First write the last synced time to the snap file + snapFos => + logger.info("Updating state of dataset={}, shard={} with syncTime={}", + datasetRef.toString, shard, time) + snapFos.write(snapFileV1Magic) + val buff = ByteBuffer.allocate(8) + buff.putLong(time) + snapFos.write(buff.array()) + }.flatMap { _ => + // If writing the snap file is successful, update the generation file so + // next call to initState does not trigger the rebuild. Note that these + // two operations are not atomic, we may still have the snap file updated but + // generation file not, in which case next call to initState will trigger + // the rebuild. + expectedGeneration.map{ + generation => + Using(new FileOutputStream(genFile)) { genFos => + logger.info("Updating genFile " + + "of dataset={}, shard={} with generation={}", + datasetRef.toString, shard, generation) + genFos.write(genFileV1Magic) + val buff = ByteBuffer.allocate(4) + buff.putInt(generation) + genFos.write(buff.array()) + } + }.getOrElse(Success(())) + }.get + case IndexState.TriggerRebuild => // Delete gen file so next restart triggers the rebuild + val genFile = new File(genFilePathFormat.format(datasetRef.toString, shard)) + val deleted = genFile.delete() + logger.info("genFile={} delete returned {} on TriggerRebuild, " + + "index will be rebuild on next restart of dataset={} and shard={}", + genFile, deleted, datasetRef.toString, shard) + case IndexState.Empty => // Empty is invoked denoting the index directory is empty. This also signals + // metastore to delete the corresponding gen and snap file + val genFile = new File(genFilePathFormat.format(datasetRef.toString, shard)) + val snapFile = new File(snapFilePathFormat.format(datasetRef.toString, shard)) + val genDeleted = genFile.delete() + val snapDeleted = snapFile.delete() + logger.info("genFile={} delete returned {} and " + + "snapFile={} delete returned {} when empty state was triggered " + + "for dataset={} and shard={}", + genFile, genDeleted, snapFile, snapDeleted, datasetRef.toString, shard) + case _ => //NOP + } + } + // scalastyle:on method.length + + /** + * Updates the state of the index + * @param datasetRef The dataset ref + * @param shard the shard id of index + * @param state One of the IndexState.Values for the index + * @param time a time in millis since epoch for when the state was updated + */ + override def updateInitState(datasetRef: DatasetRef, shard: Int, state: IndexState.Value, time: Long): Unit = ??? +} + /** * Non thread safe in memory index state metadata store */ diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 73c92fae62..335acc9f36 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -141,7 +141,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, // If index rebuild is triggered or the state is Building, simply clean up the index directory and start // index rebuild if ( - lifecycleManager.forall(_.shouldTriggerRebuild(ref, shardNum)) || getCurrentIndexState()._1 == IndexState.Rebuilding + lifecycleManager.forall(_.shouldTriggerRebuild(ref, shardNum)) ) { logger.info(s"Cleaning up indexDirectory=$indexDiskLocation for dataset=$ref, shard=$shardNum") deleteRecursively(indexDiskLocation.toFile) match { diff --git a/core/src/test/scala/filodb.core/memstore/FileSystemBasedIndexMetadataStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/FileSystemBasedIndexMetadataStoreSpec.scala new file mode 100644 index 0000000000..0f18bbc8fc --- /dev/null +++ b/core/src/test/scala/filodb.core/memstore/FileSystemBasedIndexMetadataStoreSpec.scala @@ -0,0 +1,351 @@ +package filodb.core.memstore + +import filodb.core.GdeltTestData.{datasetOptions, schema} +import filodb.core.metadata.Dataset +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import java.io.{FileInputStream, FileOutputStream} +import java.nio.ByteBuffer +import scala.util.{Failure, Try} + +// scalastyle:off +class FileSystemBasedIndexMetadataStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfterEach { + + + val snap = 1663092567977L + + val dataset = Dataset("gdelt", schema.slice(4, 6), schema.patch(4, Nil, 2), datasetOptions) + + it("initState with no gen file and expected version passed should Trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + } + + override def beforeEach(): Unit = { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val buffer: ByteBuffer = ByteBuffer.allocate(8) + buffer.putLong(snap) + val fos = new FileOutputStream(snapFile) + fos.write(FileSystemBasedIndexMetadataStore.snapFileV1Magic) + fos.write(buffer.array()) + fos.close() + } + + + it("initState with expectedVersion with no magic bytes in gen file should Trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.createNewFile() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + } + + it("initState with expectedVersion with invalid magic bytes in gen file should Trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + os.write(0) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + } + + it("initState with expectedVersion with valid magic bytes and less than 4 bytes for version should trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 1, 2)) // Length as 3 bytes instead of expected 4 + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + } + + it("initState with expectedVersion, valid magic bytes and 4 bytes for version should invoke currentState") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 0, 0, 1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + } + + it("initState with expectedVersion < version in gen file should invoke currentState") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 0, 0, 1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + } + + + it("initState with expectedVersion = version in gen file should invoke currentState") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 0, 0, 1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + } + + it("initState with expectedVersion > version in gen file should trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(2)) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 0, 0, 1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + } + + it("initState with no expected version should invoke currentState") { + val tmpFile = System.getProperty("java.io.tmpdir") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + genFile.delete() + val os = new FileOutputStream(genFile) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + os.write(FileSystemBasedIndexMetadataStore.genFileV1Magic) + os.write(Array[Byte](0, 0, 0, 1)) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + } + + + it("currentState with no snap file should be Empty") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + snapFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + } + + + it("currentState with invalid magic byte should should Trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val fos = new FileOutputStream(snapFile) + fos.write(0) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + // invalid magic bytes should trigger rebuild, but currentState should throw exception + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + Try(store.currentState(dataset.ref, 0)) match { + case Failure(e: IllegalStateException) => e.getMessage shouldEqual "Invalid magic bytes in snap file" + case _ => fail("Expected to see Invalid magic bytes") + } + } + + it("currentState with truncated length should Trigger rebuild") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + snapFile.delete() + val fos = new FileOutputStream(snapFile) + fos.write(FileSystemBasedIndexMetadataStore.snapFileV1Magic) + fos.write(Array[Byte](0, 0, 0)) + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + Try(store.currentState(dataset.ref, 0)) match { + case Failure(e: IllegalStateException) => e.getMessage shouldEqual "Snap file truncated" + case _ => fail("Expected to see Invalid magic bytes") + } + } + + it("currentState with valid snap file should return Synced with correct timestamp") { + val tmpFile = System.getProperty("java.io.tmpdir") + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snap))) + } + + it("should update snap file but not gen file when updateState with Synced state is invoked with expectedGeneration as None") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + val snapTime = System.currentTimeMillis() + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual false + val snapIn = new FileInputStream(snapFile) + snapIn.read() shouldEqual FileSystemBasedIndexMetadataStore.snapFileV1Magic + val bytes = Array[Byte](0, 0, 0, 0 ,0, 0, 0, 0) + snapIn.read(bytes) shouldEqual 8 + ByteBuffer.wrap(bytes).getLong shouldEqual snapTime + } + + it("should update snap file and gen file when updateState with Synced state is invoked with expectedGeneration as Some value") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(10)) + val snapTime = System.currentTimeMillis() + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual true + val snapIn = new FileInputStream(snapFile) + snapIn.read() shouldEqual FileSystemBasedIndexMetadataStore.snapFileV1Magic + val bytes = Array[Byte](0, 0, 0, 0 ,0, 0, 0, 0) + snapIn.read(bytes) shouldEqual 8 + ByteBuffer.wrap(bytes).getLong shouldEqual snapTime + + val genIn = new FileInputStream(genFile) + genIn.read() shouldEqual FileSystemBasedIndexMetadataStore.genFileV1Magic + val genBytes = Array[Byte](0, 0, 0, 0) + genIn.read(genBytes) shouldEqual 4 + ByteBuffer.wrap(genBytes).getInt shouldEqual 10 + } + + it("should not Trigger rebuild where state is synced and expected version is written to gen file") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(10)) + val snapTime = System.currentTimeMillis() + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual true + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + } + + + it("should not Trigger rebuild where state is synced and expected version is None") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + val snapTime = System.currentTimeMillis() + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual false + // As long as snap file exists, existence of gen file does not matter + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + } + + it("initState should return TriggerRebuild when state is updated to TriggerRebuild, the currentState should not be affected") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + val snapTime = System.currentTimeMillis() + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual true + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + // Now update state to TriggerRebuild and the return value should be TriggerRebuild, if snap file exists it + // should be unaffected as its still needed for currently running shard to update its state + store.updateState(dataset.ref, 0, IndexState.TriggerRebuild, snapTime) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual false + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + } + + it("initState should return TriggerRebuild when state is updated to Empty and expectedVersion is not None, the currentState should be Empty") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, Some(1)) + val snapTime = System.currentTimeMillis() + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual true + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + // Now update state to TriggerRebuild and the return value should be TriggerRebuild, if snap file exists it + // should be unaffected as its still needed for currently running shard to update its state + store.updateState(dataset.ref, 0, IndexState.Empty, snapTime) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + snapFile.exists() shouldEqual false + genFile.exists() shouldEqual false + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + } + + it("initState should return Empty when state is updated to Empty and expectedVersion is None, the currentState should be Empty") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None) + val snapTime = System.currentTimeMillis() + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual false + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + // Now update state to TriggerRebuild and the return value should be TriggerRebuild, if snap file exists it + // should be unaffected as its still needed for currently running shard to update its state + store.updateState(dataset.ref, 0, IndexState.Empty, snapTime) + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + snapFile.exists() shouldEqual false + genFile.exists() shouldEqual false + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) + } + + it("initState should TriggerRebuild if state is older than maxRefreshHours") { + val tmpFile = System.getProperty("java.io.tmpdir") + val snapFile = new java.io.File(tmpFile, "_gdelt_0_snap") + val genFile = new java.io.File(tmpFile, "_gdelt_0_rebuild.gen") + snapFile.delete() + genFile.delete() + val store = new FileSystemBasedIndexMetadataStore(tmpFile, None, 2) + + val snapTime = System.currentTimeMillis() - 4 * 60 * 60 * 1000 // make snapTime 4 hours old + store.initState(dataset.ref, 0) shouldEqual ((IndexState.Empty, None)) // two hours of max sync time + store.updateState(dataset.ref, 0, IndexState.Synced, snapTime) + snapFile.exists() shouldEqual true + genFile.exists() shouldEqual false + store.initState(dataset.ref, 0) shouldEqual ((IndexState.TriggerRebuild, None)) + store.currentState(dataset.ref, 0) shouldEqual ((IndexState.Synced, Some(snapTime))) + // IMPORTANT: Current state is still Synced. If shards are running, it makes no sense to TriggerRebuild + // as it happens only on start up. Perhaps in future iterations if the refresh overload is high, shard can + // gracefully shutdown to rebuild the index from scratch + } + + it("should parse the expectedVersion when a numeric value as String is provided") { + FileSystemBasedIndexMetadataStore.expectedVersion("123") shouldEqual Some(123) + } + + it("should return None if null string is provided") { + FileSystemBasedIndexMetadataStore.expectedVersion(null) shouldEqual None + } + + it("should return None if non numeric string is provided") { + FileSystemBasedIndexMetadataStore.expectedVersion("test") shouldEqual None + } + + it("should return None if overflowing numeric string is provided") { + FileSystemBasedIndexMetadataStore.expectedVersion("12345678686774553466 ") shouldEqual None + } +} +// scalastyle:on