diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiConcatTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiConcatTest.java index aca2eea89..ffa68b7c5 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiConcatTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiConcatTest.java @@ -2,7 +2,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; import java.util.Arrays; +import java.util.List; import org.junit.jupiter.api.Test; @@ -160,4 +162,23 @@ public void testWithFailureCollection() { .assertFailedWith(IllegalStateException.class, "boom"); } + + @Test + public void collectThreeFailures() { + List failures = List.of(new IOException("foo"), new IOException("bar"), new IOException("baz")); + AssertSubscriber sub = Multi.createBy().concatenating().collectFailures().streams( + Multi.createFrom().item("foo"), + Multi.createFrom().failure(failures.get(0)), + Multi.createFrom().item("bar"), + Multi.createFrom().failure(failures.get(1)), + Multi.createFrom().item("baz"), + Multi.createFrom().failure(failures.get(2))).subscribe() + .withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + sub.assertItems("foo", "bar", "baz"); + sub.assertFailedWith(CompositeException.class); + CompositeException compositeException = (CompositeException) sub.getFailure(); + assertThat(compositeException.getCauses()).hasSize(3); + assertThat(compositeException.getCauses()).containsExactlyElementsOf(failures); + } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java index 18b6f74d0..5785c8793 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.operators.multi; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -291,4 +292,34 @@ void testUpfrontCompletion() { sub.assertCompleted().assertHasNotReceivedAnyItem(); } + + @Test + void rejectBadRequests() { + Multi multi = upstream.onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().item(1)); + + AssertSubscriber sub = multi.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(0L); + sub.assertHasNotReceivedAnyItem().assertFailedWith(IllegalArgumentException.class, + "Invalid request number, must be greater than 0"); + + sub = multi.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(-10L); + sub.assertHasNotReceivedAnyItem().assertFailedWith(IllegalArgumentException.class, + "Invalid request number, must be greater than 0"); + } + + @Test + void testCancellation() { + AtomicBoolean cancelled = new AtomicBoolean(); + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(100)) + .onCancellation().invoke(() -> cancelled.set(true)) + .onItem().transformToMultiAndConcatenate(tick -> Multi.createFrom().items(1, 2, 3)) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + await().atMost(Duration.ofSeconds(3)).until(() -> sub.getItems().size() > 12); + sub.cancel(); + await().atMost(Duration.ofSeconds(3)).until(cancelled::get); + assertThat(sub.isCancelled()).isTrue(); + assertThat(sub.hasCompleted()).isFalse(); + assertThat(sub.getItems()).contains(1, 2, 3); + } }