Skip to content

Commit

Permalink
[fix][broker] Fix ack hole in cursor for geo-replication (#20931)
Browse files Browse the repository at this point in the history
Co-authored-by: Masahiro Sakamoto <[email protected]>
  • Loading branch information
massakam and Masahiro Sakamoto authored Oct 10, 2024
1 parent acac72e commit 9f8b4a6
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

protected volatile boolean fetchSchemaInProgress = false;
private volatile boolean waitForCursorRewinding = false;

public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor,
String remoteCluster, String remoteTopic,
Expand Down Expand Up @@ -143,9 +144,15 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man

@Override
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting.
cursor.rewind();
cursor.cancelPendingReadRequest();
waitForCursorRewinding = true;

// Repeat until there are no read operations in progress
if (STATE_UPDATER.get(this) == State.Starting && HAVE_PENDING_READ_UPDATER.get(this) == TRUE
&& !cursor.cancelPendingReadRequest()) {
brokerService.getPulsar().getExecutor()
.schedule(() -> setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
return;
}

/**
* 1. Try change state to {@link Started}.
Expand All @@ -158,6 +165,7 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
if (!(producer instanceof ProducerImpl)) {
log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be"
+ " created successfully: {}", replicatorId, state);
waitForCursorRewinding = false;
doCloseProducerAsync(producer, () -> {});
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
}
Expand All @@ -168,6 +176,11 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
backOff.reset();
// activate cursor: so, entries can be cached.
this.cursor.setActive();

// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
cursor.rewind();
waitForCursorRewinding = false;

// read entries
readMoreEntries();
} else {
Expand All @@ -183,6 +196,7 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}",
replicatorId, changeStateRes.getRight());
}
waitForCursorRewinding = false;
// Close the producer if change the state fail.
doCloseProducerAsync(producer, () -> {});
}
Expand Down Expand Up @@ -296,6 +310,11 @@ protected void readMoreEntries() {

// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
if (waitForCursorRewinding) {
log.info("[{}] Skip the reading because repl producer is starting", replicatorId);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages or {} bytes", replicatorId, messagesToRead,
bytesToRead);
Expand Down

0 comments on commit 9f8b4a6

Please sign in to comment.