Skip to content

Commit

Permalink
Merge pull request #1438 from smallrye/fix/1436
Browse files Browse the repository at this point in the history
fix: do not interrupt threads on cancellation in UniDelayOnItem
  • Loading branch information
cescoffier authored Nov 22, 2023
2 parents 0217652 + d883a75 commit b52c5a5
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void request(long n) {
public void onCompletion() {
if (terminated.compareAndSet(RUNNING, SUCCEED)) {
if (task != null) {
task.cancel(true);
task.cancel(false);
task = null;
}
checkedComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onCompletion() {
@Override
public void cancel() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledFuture.cancel(false);
}
super.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ boolean replace(Future<?> task) {
Future current = container.get();
if (current == NONE) {
if (task != null) {
task.cancel(true);
task.cancel(false);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void cancel(ConnectableMultiConnection connection) {
}

ScheduledFuture<?> future = executor.schedule(connection, duration.toMillis(), TimeUnit.MILLISECONDS);
connection.setTimer(() -> future.cancel(true));
connection.setTimer(() -> future.cancel(false));
}

void terminated(ConnectableMultiConnection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void cancel() {
if (!isCancelled()) {
super.cancel();
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledFuture.cancel(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -558,4 +559,21 @@ public void rejectNullExecutors() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("`executor` must not be `null`");
}

@Test
void avoidSpuriousInterruptsWithBackoff() {
AtomicBoolean flip = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean();
String result = Uni.createFrom().item(() -> {
if (flip.getAndSet(true)) {
return "yolo";
}
throw new RuntimeException("boom");
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(5)).atMost(1)
.onItem().invoke(() -> interrupted.set(Thread.currentThread().isInterrupted()))
.await().atMost(Duration.ofSeconds(1));
assertThat(result).isEqualTo("yolo");
assertThat(interrupted).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testAsUniWithNever() {
.onItem().ignoreAsUni().subscribeAsCompletionStage();

assertThat(future).isNotCompleted();
future.cancel(true);
future.cancel(false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void testWithInterruptedFuture() {
UniAssertSubscriber<String> subscriber = UniAssertSubscriber.create();
CompletableFuture<String> cs = new CompletableFuture<>();
Uni.createFrom().future(cs).subscribe().withSubscriber(subscriber);
cs.cancel(true);
cs.cancel(false);
subscriber
.awaitFailure()
.assertFailedWith(CancellationException.class, null);
Expand Down

0 comments on commit b52c5a5

Please sign in to comment.