Skip to content

Commit

Permalink
fix(downsample): skip reading partkey from raw during full migration
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 committed Jan 5, 2024
1 parent 5a0a3ae commit 56084ba
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ class DSIndexJob(dsSettings: DownsamplerSettings,
DownsamplerContext.dsLogger.info(s"Starting Full PartKey Migration for shard=$shard")
val partKeys = rawDataSource.scanPartKeys(ref = rawDatasetRef,
shard = shard.toInt)
count += migrateWithDownsamplePartKeys(partKeys, shard)
count += migrateWithDownsamplePartKeys(partKeys, shard, fullIndexMigration)
DownsamplerContext.dsLogger.info(s"Successfully Completed Full PartKey Migration for shard=$shard count=$count")
} else {
DownsamplerContext.dsLogger.info(s"Starting Partial PartKey Migration for shard=$shard")
for (epochHour <- fromHour until toHourExcl) {
val partKeys = rawDataSource.getPartKeysByUpdateHour(ref = rawDatasetRef,
shard = shard.toInt, updateHour = epochHour)
count += migrateWithDownsamplePartKeys(partKeys, shard)
count += migrateWithDownsamplePartKeys(partKeys, shard, fullIndexMigration)
}
DownsamplerContext.dsLogger.info(s"Successfully Completed Partial PartKey Migration for shard=$shard " +
s"count=$count fromHour=$fromHour toHourExcl=$toHourExcl")
Expand All @@ -92,7 +92,8 @@ class DSIndexJob(dsSettings: DownsamplerSettings,
Thread.sleep(62000) // quick & dirty hack to ensure that the completed metric gets published
}

def migrateWithDownsamplePartKeys(partKeys: Observable[PartKeyRecord], shard: Int): Int = {
def migrateWithDownsamplePartKeys(partKeys: Observable[PartKeyRecord], shard: Int,
fullIndexMigration: Boolean): Int = {
@volatile var count = 0
val rawDataSource = rawCassandraColStore
val pkRecords = partKeys.filter { pk =>
Expand All @@ -116,8 +117,9 @@ class DSIndexJob(dsSettings: DownsamplerSettings,
if (eligible) count += 1
eligible
}
}.map(pkr => rawDataSource.getPartKeyRecordOrDefault(ref = rawDatasetRef, shard = shard,
pkr = pkr)) // Merge with persisted (if exists) partKey.
}.map(pkr => if (fullIndexMigration) pkr
else rawDataSource.getPartKeyRecordOrDefault(ref = rawDatasetRef, shard = shard,
pkr = pkr)) // Merge with persisted (if exists) partKey.
.map(toDownsamplePkrWithHash)
val updateHour = System.currentTimeMillis() / 1000 / 60 / 60
Await.result(dsDatasource.writePartKeys(ref = dsDatasetRef, shard = shard,
Expand Down

0 comments on commit 56084ba

Please sign in to comment.