From 9f8b4a6d15899c7255c525299e1d6011a09b940e Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 10 Oct 2024 19:03:56 +0900 Subject: [PATCH] [fix][broker] Fix ack hole in cursor for geo-replication (#20931) Co-authored-by: Masahiro Sakamoto --- .../persistent/PersistentReplicator.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index b3d7546beed81..bcb1f759540b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -116,6 +116,7 @@ public abstract class PersistentReplicator extends AbstractReplicator protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); protected volatile boolean fetchSchemaInProgress = false; + private volatile boolean waitForCursorRewinding = false; public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor, String remoteCluster, String remoteTopic, @@ -143,9 +144,15 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man @Override protected void setProducerAndTriggerReadEntries(Producer producer) { - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting. - cursor.rewind(); - cursor.cancelPendingReadRequest(); + waitForCursorRewinding = true; + + // Repeat until there are no read operations in progress + if (STATE_UPDATER.get(this) == State.Starting && HAVE_PENDING_READ_UPDATER.get(this) == TRUE + && !cursor.cancelPendingReadRequest()) { + brokerService.getPulsar().getExecutor() + .schedule(() -> setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS); + return; + } /** * 1. Try change state to {@link Started}. @@ -158,6 +165,7 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { if (!(producer instanceof ProducerImpl)) { log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be" + " created successfully: {}", replicatorId, state); + waitForCursorRewinding = false; doCloseProducerAsync(producer, () -> {}); throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl"); } @@ -168,6 +176,11 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { backOff.reset(); // activate cursor: so, entries can be cached. this.cursor.setActive(); + + // Rewind the cursor to be sure to read again all non-acked messages sent while restarting + cursor.rewind(); + waitForCursorRewinding = false; + // read entries readMoreEntries(); } else { @@ -183,6 +196,7 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}", replicatorId, changeStateRes.getRight()); } + waitForCursorRewinding = false; // Close the producer if change the state fail. doCloseProducerAsync(producer, () -> {}); } @@ -296,6 +310,11 @@ protected void readMoreEntries() { // Schedule read if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { + if (waitForCursorRewinding) { + log.info("[{}] Skip the reading because repl producer is starting", replicatorId); + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + return; + } if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages or {} bytes", replicatorId, messagesToRead, bytesToRead);