diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala index 9e8b5d7c39..a5f1a2424d 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala @@ -42,6 +42,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, s"INSERT INTO $tableString (partition, ingestion_time, start_time, info) " + s"VALUES (?, ?, ?, ?) USING TTL ?") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val deleteIndexCql = session.prepare( s"DELETE FROM $tableString WHERE partition=? AND ingestion_time=? AND start_time=?") diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysByUpdateTimeTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysByUpdateTimeTable.scala index 527cd3d41a..446b7ed2e0 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysByUpdateTimeTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysByUpdateTimeTable.scala @@ -38,6 +38,7 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef, s"INSERT INTO $tableString (shard, epochHour, split, partKey, startTime, endTime) " + s"VALUES (?, ?, ?, ?, ?, ?) USING TTL ?") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val readCql = session.prepare( s"SELECT * FROM $tableString " + diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysTable.scala index 5944b7173a..0763c344a8 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysTable.scala @@ -36,11 +36,13 @@ sealed class PartitionKeysTable(val dataset: DatasetRef, s"INSERT INTO ${tableString} (partKey, startTime, endTime) " + s"VALUES (?, ?, ?) USING TTL ?") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val writePartitionCqlNoTtl = session.prepare( s"INSERT INTO ${tableString} (partKey, startTime, endTime) " + s"VALUES (?, ?, ?)") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val scanCql = session.prepare( s"SELECT * FROM $tableString " + diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysV2Table.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysV2Table.scala index f949722b68..9e5eeefa03 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysV2Table.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionKeysV2Table.scala @@ -37,11 +37,13 @@ sealed class PartitionKeysV2Table(val dataset: DatasetRef, s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " + s"VALUES (?, ?, ?, ?, ?) USING TTL ?") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val writePartitionCqlNoTtl = session.prepare( s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " + s"VALUES (?, ?, ?, ?, ?)") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val scanCql = session.prepare( s"SELECT partKey, startTime, endTime, shard FROM $tableString " + diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala index 7cd8ef28dd..417832c17c 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala @@ -45,6 +45,7 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef, s"INSERT INTO $tableString (partition, chunkid, info, chunks) " + s"VALUES (?, ?, ?, ?) USING TTL ?") .setConsistencyLevel(writeConsistencyLevel) + .setIdempotent(true) private lazy val deleteChunksCql = session.prepare( s"DELETE FROM $tableString WHERE partition=? AND chunkid IN ?") diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 86cb037f11..340f782001 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -882,6 +882,10 @@ class TimeSeriesShard(val ref: DatasetRef, ingested += ingestConsumer.numActuallyIngested _offset = offset } + else { + // Adding this log line to debug the shard stuck in recovery scenario(s) + logger.error(s"[Container Empty] record-offset: ${offset} last-ingested-offset: ${_offset}") + } } else { shardStats.oldContainers.increment() }