Skip to content

Commit

Permalink
Merge pull request #471 from smallrye/bug/uni-cancelled-future
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Feb 10, 2021
2 parents 1e49045 + 741904c commit 7256672
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -66,16 +67,26 @@ private void dispatchImmediateResult(Future<? extends T> future, UniSubscriber<?
}

private void dispatchDeferredResult(Future<? extends T> future, UniSubscriber<? super T> downstream) {
downstream.onSubscribe(() -> future.cancel(false));
AtomicBoolean cancelled = new AtomicBoolean(false);
downstream.onSubscribe(() -> {
cancelled.set(true);
future.cancel(false);
});
// Because future.get is blocking, we must use a separated thread.
Infrastructure.getDefaultExecutor().execute(() -> {
try {
T item = future.get();
downstream.onItem(item);
if (!cancelled.get()) {
downstream.onItem(item);
}
} catch (ExecutionException e) {
downstream.onFailure(e.getCause());
if (!cancelled.get()) {
downstream.onFailure(e.getCause());
}
} catch (Exception e) {
downstream.onFailure(e);
if (!cancelled.get()) {
downstream.onFailure(e);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -221,7 +222,7 @@ public void testThatSubscriberCanCancelBeforeEmission() {
subscriber.assertNotTerminated();
}

@Test
@RepeatedTest(10)
public void testThatSubscriberCanCancelBeforeEmissionWithSupplier() {
UniAssertSubscriber<Integer> subscriber = UniAssertSubscriber.create();
CompletableFuture<Integer> cs = new CompletableFuture<>();
Expand Down

0 comments on commit 7256672

Please sign in to comment.