Skip to content

Commit

Permalink
[dvc][server] Added safeguard in LFSIT::updateOffsetsFromConsumerReco…
Browse files Browse the repository at this point in the history
…rd (#769)

There seems to be a race condition where the RT offset gets persisted
into the VT offset field of the PCS. This change guards against this
bookkeeping error, and logs information if it happens.
  • Loading branch information
FelixGV authored Nov 21, 2023
1 parent 378f6a5 commit eb2a17d
Showing 1 changed file with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,16 @@ protected void updateOffsetsFromConsumerRecord(

// Only update the metadata if this replica should NOT produce to version topic.
if (!shouldProduceToVersionTopic(partitionConsumptionState)) {
PubSubTopic consumedTopic = consumerRecord.getTopicPartition().getPubSubTopic();
if (consumedTopic.isRealTime()) {
// Does this ever happen?
LOGGER.warn(
"Will short-circuit updateOffsetsFromConsumerRecord because the consumerRecord is coming from a "
+ "RT topic ({}), partitionConsumptionState: {}",
consumedTopic,
partitionConsumptionState);
return;
}
/**
* If either (1) this is a follower replica or (2) this is a leader replica who is consuming from version topic
* in a local Kafka cluster, we can update the offset metadata in offset record right after consuming a message;
Expand Down

0 comments on commit eb2a17d

Please sign in to comment.