Skip to content

Commit

Permalink
Setting writes to idempotent and adding debug log for shard recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Feb 7, 2024
1 parent 32e4eeb commit 9053921
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (partition, ingestion_time, start_time, info) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val deleteIndexCql = session.prepare(
s"DELETE FROM $tableString WHERE partition=? AND ingestion_time=? AND start_time=?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (shard, epochHour, split, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val readCql = session.prepare(
s"SELECT * FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val scanCql = session.prepare(
s"SELECT * FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ sealed class PartitionKeysV2Table(val dataset: DatasetRef,
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val scanCql = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (partition, chunkid, info, chunks) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val deleteChunksCql = session.prepare(
s"DELETE FROM $tableString WHERE partition=? AND chunkid IN ?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,10 @@ class TimeSeriesShard(val ref: DatasetRef,
ingested += ingestConsumer.numActuallyIngested
_offset = offset
}
else {
// Adding this log line to debug the shard stuck in recovery scenario(s)
logger.error(s"[Container Empty] record-offset: ${offset} last-ingested-offset: ${_offset}")
}
} else {
shardStats.oldContainers.increment()
}
Expand Down

0 comments on commit 9053921

Please sign in to comment.