Skip to content

Commit

Permalink
Merge pull request #1536 from jponge/fix/BroadcastProcessor-leak
Browse files Browse the repository at this point in the history
fix(BroadcastProcessor): prevent memory leaks by clearing active subscriptions
  • Loading branch information
jponge authored Mar 6, 2024
2 parents 475e19d + 6a2c6f0 commit 52050e8
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,40 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(T item) {
ParameterValidation.nonNullNpe(item, "item");
for (BroadcastSubscription<T> s : subscribers.get()) {
s.onNext(item);
List<BroadcastSubscription<T>> subscriptions = subscribers.get();
if (subscriptions != TERMINATED) {
for (BroadcastSubscription<T> s : subscriptions) {
s.onNext(item);
}
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onError(Throwable failure) {
ParameterValidation.nonNullNpe(failure, "failure");
if (subscribers.get() == TERMINATED) {
List<BroadcastSubscription<?>> subscriptions = subscribers.getAndSet((List) TERMINATED);
if (subscriptions == TERMINATED) {
return;
}
this.failure = failure;
List<BroadcastSubscription<?>> andSet = subscribers.getAndSet((List) TERMINATED);
for (BroadcastSubscription<?> s : andSet) {
for (BroadcastSubscription<?> s : subscriptions) {
s.onError(failure);
}
subscriptions.clear();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onComplete() {
if (subscribers.get() == TERMINATED) {
List<BroadcastSubscription<?>> subscriptions = subscribers.getAndSet((List) TERMINATED);
if (subscriptions == TERMINATED) {
return;
}
List<BroadcastSubscription<?>> andSet = subscribers.getAndSet((List) TERMINATED);
for (BroadcastSubscription<?> s : andSet) {
for (BroadcastSubscription<?> s : subscriptions) {
s.onComplete();
}
subscriptions.clear();
}

/**
Expand Down

0 comments on commit 52050e8

Please sign in to comment.