Skip to content

Commit

Permalink
[server] Augment completion report with previous ready to serve state (
Browse files Browse the repository at this point in the history
…#1288)

* [server] Augment completion report with previous ready to serve state

This change persists into the offset record weather or not this partition originally marked itself as ready to serve.  This is then used on restart to determine if a node should report completion or not when coming online.

The intention is that when a node is lagged under normal conditions (things like heavy load or some other bad condition), we can still restart the node or do deployments and have the node come up and serve traffic.  This is done under the notion that it's generally preferable to be online and serving and stale as opposed to caught up/catching up, but unavailable.

This will not impact buffer replay on a version push because a replay won't pass a ready to serve check.
  • Loading branch information
ZacAttack authored Nov 21, 2024
1 parent 6fac562 commit 730a597
Show file tree
Hide file tree
Showing 4 changed files with 592 additions and 435 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
*/
public class PartitionConsumptionState {
private static final int MAX_INCREMENTAL_PUSH_ENTRY_NUM = 50;
private static final String PREVIOUSLY_READY_TO_SERVE = "previouslyReadyToServe";
private static final String TRUE = "true";

private final String replicaId;
private final int partition;
Expand Down Expand Up @@ -599,6 +601,19 @@ public void setSkipKafkaMessage(boolean skipKafkaMessage) {
this.skipKafkaMessage = skipKafkaMessage;
}

/**
* This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has
* passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will
* consult this information when determining if the node should come online or not to serve traffic
*/
public void recordReadyToServeInOffsetRecord() {
offsetRecord.setPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE, TRUE);
}

public boolean getReadyToServeInOffsetRecord() {
return offsetRecord.getPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE).equals(TRUE);
}

/**
* This immutable class holds a association between a key and value and the source offset of the consumed message.
* The value could be either as received in kafka ConsumerRecord or it could be a write computed value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2031,7 +2031,8 @@ private void checkConsumptionStateWhenStart(
offsetLag,
previousOffsetLag,
offsetLagThreshold);
if (offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold) {
if (newPartitionConsumptionState.getReadyToServeInOffsetRecord()
&& (offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold)) {
newPartitionConsumptionState.lagHasCaughtUp();
reportCompleted(newPartitionConsumptionState, true);
isCompletedReport = true;
Expand Down Expand Up @@ -4089,6 +4090,7 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
}
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, partition));
}
partitionConsumptionState.recordReadyToServeInOffsetRecord();
} else {
ingestionNotificationDispatcher.reportProgress(partitionConsumptionState);
}
Expand Down
Loading

0 comments on commit 730a597

Please sign in to comment.