From 90c260975f3a42a00fb5a3797b5bde0b50d69bc4 Mon Sep 17 00:00:00 2001 From: markusdlugi Date: Fri, 12 Jan 2024 14:05:25 +0100 Subject: [PATCH] fix: properly remove subscriptions in ReplayOperator Fixes a memory leak caused by old subscriptions not being removed properly. In addition, this fixes the edge case where no completion event would be sent for replay subscriptions if the number of requested items was exactly equal to the number of available items. Fixes #1482 --- .../multi/replay/AppendOnlyReplayList.java | 4 +++ .../multi/replay/ReplayOperator.java | 14 +++++++--- .../mutiny/groups/MultiReplayTest.java | 18 ++++++++++++- .../multi/replay/ReplayOperatorTest.java | 27 +++++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java index 8698c38d0..496b72f96 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java @@ -60,6 +60,10 @@ public boolean hasReachedCompletion() { return current.value instanceof Completion; } + public boolean willReachCompletion() { + return !hasReachedCompletion() && current.next.value instanceof Completion; + } + public boolean hasReachedFailure() { return current.value instanceof Failure; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java index 20f3dc714..35cd8ccef 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java @@ -21,7 +21,7 @@ public class ReplayOperator extends AbstractMulti { private final AtomicBoolean upstreamSubscriptionRequested = new AtomicBoolean(); private volatile Subscription upstreamSubscription = null; - private final CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList<>(); + protected final CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList<>(); public ReplayOperator(Multi upstream, long numberOfItemsToReplay) { this.upstream = upstream; @@ -39,11 +39,11 @@ public void subscribe(MultiSubscriber subscriber) { upstream.subscribe(new UpstreamSubscriber(subscriber)); } ReplaySubscription replaySubscription = new ReplaySubscription(subscriber); - subscriber.onSubscribe(replaySubscription); subscriptions.add(replaySubscription); + subscriber.onSubscribe(replaySubscription); } - private class ReplaySubscription implements Subscription { + protected class ReplaySubscription implements Subscription { private final MultiSubscriber downstream; private final AtomicLong demand = new AtomicLong(); @@ -115,6 +115,12 @@ private void drain() { downstream.onItem(item); emitted++; } + if (!done && cursor.willReachCompletion()) { + cancel(); + cursor.readCompletion(); + downstream.onComplete(); + return; + } demand.addAndGet(-emitted); if (wip.decrementAndGet() == 0) { return; @@ -123,7 +129,7 @@ private void drain() { } } - private class UpstreamSubscriber implements MultiSubscriber, ContextSupport { + protected class UpstreamSubscriber implements MultiSubscriber, ContextSupport { private final MultiSubscriber initialSubscriber; diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java index 362c74016..9fdcd8106 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java @@ -9,8 +9,13 @@ import java.util.Arrays; import java.util.List; import java.util.Random; -import java.util.concurrent.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +71,17 @@ void basicReplayAll() { sub.assertCompleted(); } + @Test + void shouldCompleteWhenRequestEqualsMax() { + Multi upstream = Multi.createFrom().range(1, 10); + Multi replay = Multi.createBy().replaying().upTo(9).ofMulti(upstream); + + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(9); + assertThat(sub.getItems()).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); + sub.assertCompleted(); + } + @Test void basicReplayLatest3() { ExecutorService pool = Executors.newFixedThreadPool(1); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java new file mode 100644 index 000000000..1ebe24f49 --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperatorTest.java @@ -0,0 +1,27 @@ +package io.smallrye.mutiny.operators.multi.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; + +public class ReplayOperatorTest { + + @Test + public void shouldRemoveSubscriptionAfterCompletion() { + // given + var upstream = Multi.createFrom().range(0, 3); + var operator = new ReplayOperator<>(upstream, 3); + + // when + var subscriber = operator.subscribe().withSubscriber(AssertSubscriber.create(3)); + var subscriber2 = operator.subscribe().withSubscriber(AssertSubscriber.create(3)); + + // then + subscriber.assertItems(0, 1, 2).assertCompleted(); + subscriber2.assertItems(0, 1, 2).assertCompleted(); + assertEquals(0, operator.subscriptions.size()); + } +}