Skip to content

Commit

Permalink
fix(BroadcastProcessor): prevent memory leaks by clearing active subs…
Browse files Browse the repository at this point in the history
…criptions

Refs: #1532
  • Loading branch information
jponge committed Mar 4, 2024
1 parent 8559f08 commit 6a2c6f0
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,40 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(T item) {
ParameterValidation.nonNullNpe(item, "item");
for (BroadcastSubscription<T> s : subscribers.get()) {
s.onNext(item);
List<BroadcastSubscription<T>> subscriptions = subscribers.get();
if (subscriptions != TERMINATED) {
for (BroadcastSubscription<T> s : subscriptions) {
s.onNext(item);
}
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onError(Throwable failure) {
ParameterValidation.nonNullNpe(failure, "failure");
if (subscribers.get() == TERMINATED) {
List<BroadcastSubscription<?>> subscriptions = subscribers.getAndSet((List) TERMINATED);
if (subscriptions == TERMINATED) {
return;
}
this.failure = failure;
List<BroadcastSubscription<?>> andSet = subscribers.getAndSet((List) TERMINATED);
for (BroadcastSubscription<?> s : andSet) {
for (BroadcastSubscription<?> s : subscriptions) {
s.onError(failure);
}
subscriptions.clear();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onComplete() {
if (subscribers.get() == TERMINATED) {
List<BroadcastSubscription<?>> subscriptions = subscribers.getAndSet((List) TERMINATED);
if (subscriptions == TERMINATED) {
return;
}
List<BroadcastSubscription<?>> andSet = subscribers.getAndSet((List) TERMINATED);
for (BroadcastSubscription<?> s : andSet) {
for (BroadcastSubscription<?> s : subscriptions) {
s.onComplete();
}
subscriptions.clear();
}

/**
Expand Down

0 comments on commit 6a2c6f0

Please sign in to comment.