From 7bd37e4af66f8c0657f45490be2a6a1eb843b738 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 4 Mar 2024 19:19:33 +0100 Subject: [PATCH] fix(BroadcastProcessor): prevent memory leaks by clearing active subscriptions Refs: #1532 (cherry picked from commit 6a2c6f0275bd9ba3aa04014fb66b25ccf1f9c437) --- .../multi/processors/BroadcastProcessor.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/BroadcastProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/BroadcastProcessor.java index 0966e6be1..794094ffd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/BroadcastProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/BroadcastProcessor.java @@ -133,8 +133,11 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T item) { ParameterValidation.nonNullNpe(item, "item"); - for (BroadcastSubscription s : subscribers.get()) { - s.onNext(item); + List> subscriptions = subscribers.get(); + if (subscriptions != TERMINATED) { + for (BroadcastSubscription s : subscriptions) { + s.onNext(item); + } } } @@ -142,26 +145,28 @@ public void onNext(T item) { @Override public void onError(Throwable failure) { ParameterValidation.nonNullNpe(failure, "failure"); - if (subscribers.get() == TERMINATED) { + List> subscriptions = subscribers.getAndSet((List) TERMINATED); + if (subscriptions == TERMINATED) { return; } this.failure = failure; - List> andSet = subscribers.getAndSet((List) TERMINATED); - for (BroadcastSubscription s : andSet) { + for (BroadcastSubscription s : subscriptions) { s.onError(failure); } + subscriptions.clear(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void onComplete() { - if (subscribers.get() == TERMINATED) { + List> subscriptions = subscribers.getAndSet((List) TERMINATED); + if (subscriptions == TERMINATED) { return; } - List> andSet = subscribers.getAndSet((List) TERMINATED); - for (BroadcastSubscription s : andSet) { + for (BroadcastSubscription s : subscriptions) { s.onComplete(); } + subscriptions.clear(); } /**