diff --git a/.build/update-workshop-target-version.sh b/.build/update-workshop-target-version.sh new file mode 100755 index 000000000..70c1903cd --- /dev/null +++ b/.build/update-workshop-target-version.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +./mvnw -Pupdate-workshop-examples -f workshop-examples compile -DworkshopVersion=$1 +find workshop-examples -name '*.java' | xargs chmod +x \ No newline at end of file diff --git a/README.md b/README.md index a297036e1..34b869486 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,12 @@ Mutiny is based on the [Reactive Streams protocol](https://www.reactive-streams. In addition, Mutiny offers converters to interact with other popular libraries and [Kotlin](https://kotlinlang.org/). +## 👓 Mutiny workshop examples + +You can learn about Mutiny from the [documentation and website](https://smallrye.io/smallrye-mutiny). + +This repository also contains the [Mutiny workshop examples](workshop-examples) that cover the common concerns through self-contained executable [JBang](https://www.jbang.dev/) scripts. + ## 📦 Build instructions Mutiny is built with Apache Maven, so all you need is: diff --git a/documentation/docs/tutorials/mutiny-workshop.md b/documentation/docs/tutorials/mutiny-workshop.md new file mode 100644 index 000000000..e8c007cc3 --- /dev/null +++ b/documentation/docs/tutorials/mutiny-workshop.md @@ -0,0 +1,17 @@ +--- +tags: +- tutorial +- beginner +--- + +# Go further with the Mutiny workshop! + +One great option to teach yourself Mutiny is to go through the [Mutiny workshop examples](https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples). + +These self-contained [JBang](https://jbang.dev/) scripts cover the main parts of the Mutiny APIs. + +It's a fun and easy way to discover Mutiny! + +Check out [https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples](https://github.com/smallrye/smallrye-mutiny/tree/main/workshop-examples) to learn more. + +![Running a workshop sample](running-workshop-sample.png){ width="400" } diff --git a/documentation/docs/tutorials/running-workshop-sample.png b/documentation/docs/tutorials/running-workshop-sample.png new file mode 100644 index 000000000..6eec15382 Binary files /dev/null and b/documentation/docs/tutorials/running-workshop-sample.png differ diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index 957d104c9..65d7a1317 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -15,6 +15,7 @@ nav: - 'tutorials/transforming-items-asynchronously.md' - 'tutorials/handling-failures.md' - 'tutorials/retrying.md' + - 'tutorials/mutiny-workshop.md' - 'Guides': - 'guides/imperative-to-reactive.md' - 'guides/reactive-to-imperative.md' diff --git a/pom.xml b/pom.xml index 5c511a47c..496f571f7 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ kotlin bom math + workshop-examples @@ -115,6 +116,7 @@ 1.7.20 1.2.3 + 1.1.0 @@ -368,6 +370,11 @@ + + io.github.floverfelt + find-and-replace-maven-plugin + ${find-and-replace-maven-plugin.version} + diff --git a/workshop-examples/README.md b/workshop-examples/README.md new file mode 100644 index 000000000..7817e6939 --- /dev/null +++ b/workshop-examples/README.md @@ -0,0 +1,29 @@ +# Mutiny workshop examples + +This project contains learning examples for Mutiny, covering a wide range of topics such as the basics, transformations, error recovery and more. + +## Structure + +The examples are found in `src/main/java` and use a non-conventional file naming strategy. + +For instance you will find the failure handling examples in `_04_failures` where `_04_Uni_Failure_Retry.java` holds one such example. + +This naming scheme is not usual for Java programs, but it makes for a great way to organize files 😄 + +## Running the examples + +Each file is a self-contained class with a `main` method. + +A friendly option is to open the Maven project in your favorite IDE, then run the `main` method of each example of interest. + +The other option is to use [JBang](https://www.jbang.dev/) as each example is a valid JBang script: + +```shell +jbang src/main/java/_05_backpressure/_03_Visual_Drop.java +``` + +What's more each file is Unix-executable, so you can also run as: + +```shell +./src/main/java/_05_backpressure/_03_Visual_Drop.java +``` diff --git a/workshop-examples/pom.xml b/workshop-examples/pom.xml new file mode 100644 index 000000000..982a86a40 --- /dev/null +++ b/workshop-examples/pom.xml @@ -0,0 +1,66 @@ + + + + 4.0.0 + + + io.smallrye.reactive + mutiny-project + 999-SNAPSHOT + + + SmallRye Mutiny - Workshop examples + Workshop examples + mutiny-workshop-examples + + + + io.smallrye.reactive + mutiny + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + true + + + + + + + + update-workshop-examples + + + + io.github.floverfelt + find-and-replace-maven-plugin + + + exec + process-sources + + find-and-replace + + + file-contents + //DEPS io.smallrye.reactive:mutiny:.* + //DEPS io.smallrye.reactive:mutiny:${workshopVersion} + true + .java + + + + + + + + + + diff --git a/workshop-examples/src/main/java/_01_basics/_01_Uni.java b/workshop-examples/src/main/java/_01_basics/_01_Uni.java new file mode 100755 index 000000000..b30bebd0e --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_01_Uni.java @@ -0,0 +1,16 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import io.smallrye.mutiny.Uni; + +public class _01_Uni { + + public static void main(String[] args) { + System.out.println("⚡️ Hello world"); + + Uni uni = Uni.createFrom().item("Hello, world!"); + + uni.subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_02_Uni_UniSubscriber.java b/workshop-examples/src/main/java/_01_basics/_02_Uni_UniSubscriber.java new file mode 100755 index 000000000..2f408f838 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_02_Uni_UniSubscriber.java @@ -0,0 +1,33 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; +import io.smallrye.mutiny.subscription.UniSubscription; + +public class _02_Uni_UniSubscriber { + + public static void main(String[] args) { + System.out.println("⚡️ Hello world with UniSubscriber"); + + Uni uni = Uni.createFrom().item("Hello, world!"); + + uni.subscribe().withSubscriber(new UniSubscriber() { + @Override + public void onSubscribe(UniSubscription subscription) { + System.out.println("onSubscribe"); + } + + @Override + public void onItem(String item) { + System.out.println("onItem: " + item); + } + + @Override + public void onFailure(Throwable failure) { + System.out.println("onFailure: " + failure.getMessage()); + } + }); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_03_Uni_From_Supplier.java b/workshop-examples/src/main/java/_01_basics/_03_Uni_From_Supplier.java new file mode 100755 index 000000000..5a967fe54 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_03_Uni_From_Supplier.java @@ -0,0 +1,22 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.Random; + +import io.smallrye.mutiny.Uni; + +public class _03_Uni_From_Supplier { + + public static void main(String[] args) { + System.out.println("️⚡️ Uni from supplier"); + + Random random = new Random(); + + Uni uniFromSupplier = Uni.createFrom().item(random::nextInt); + + for (var i = 0; i < 5; i++) { + uniFromSupplier.subscribe().with(System.out::println); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_04_Uni_From_Supplier_And_State.java b/workshop-examples/src/main/java/_01_basics/_04_Uni_From_Supplier_And_State.java new file mode 100755 index 000000000..a72a5e4f7 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_04_Uni_From_Supplier_And_State.java @@ -0,0 +1,20 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Uni; + +public class _04_Uni_From_Supplier_And_State { + + public static void main(String[] args) { + System.out.println("️⚡️ Uni from supplier with state"); + + Uni uniFromSupplierAndState = Uni.createFrom().item(AtomicInteger::new, i -> i.addAndGet(10)); + + for (var i = 0; i < 5; i++) { + uniFromSupplierAndState.subscribe().with(System.out::println); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_05_Uni_Deferred.java b/workshop-examples/src/main/java/_01_basics/_05_Uni_Deferred.java new file mode 100755 index 000000000..9b5720f8d --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_05_Uni_Deferred.java @@ -0,0 +1,22 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.Uni; + +public class _05_Uni_Deferred { + + public static void main(String[] args) { + System.out.println("️⚡️ Deferred Uni"); + + AtomicLong ids = new AtomicLong(); + + Uni deferredUni = Uni.createFrom().deferred(() -> Uni.createFrom().item(ids::incrementAndGet)); + + for (var i = 0; i < 5; i++) { + deferredUni.subscribe().with(System.out::println); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_06_Uni_From_Emitter.java b/workshop-examples/src/main/java/_01_basics/_06_Uni_From_Emitter.java new file mode 100755 index 000000000..52e706188 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_06_Uni_From_Emitter.java @@ -0,0 +1,29 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; + +import io.smallrye.mutiny.Uni; + +public class _06_Uni_From_Emitter { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Uni from emitter"); + + ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); + CountDownLatch emitterLatch = new CountDownLatch(1); + + Uni uniFromEmitter = Uni.createFrom().emitter(emitter -> { + forkJoinPool.submit(() -> { + emitter.complete("Hello"); + emitterLatch.countDown(); + }); + }); + + uniFromEmitter.subscribe().with(System.out::println); + + emitterLatch.await(); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_07_Unit_From_Emitter_And_State.java b/workshop-examples/src/main/java/_01_basics/_07_Unit_From_Emitter_And_State.java new file mode 100755 index 000000000..6d974866a --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_07_Unit_From_Emitter_And_State.java @@ -0,0 +1,21 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Uni; + +public class _07_Unit_From_Emitter_And_State { + + public static void main(String[] args) { + System.out.println("⚡️ Uni from emitter and state"); + + Uni uniFromEmitterAndState = Uni.createFrom() + .emitter(AtomicInteger::new, (i, e) -> e.complete(i.addAndGet(10))); + + for (var i = 0; i < 5; i++) { + uniFromEmitterAndState.subscribe().with(System.out::println); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_08_Uni_From_Failure.java b/workshop-examples/src/main/java/_01_basics/_08_Uni_From_Failure.java new file mode 100755 index 000000000..8c8fba909 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_08_Uni_From_Failure.java @@ -0,0 +1,20 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.io.IOException; + +import io.smallrye.mutiny.Uni; + +public class _08_Uni_From_Failure { + + public static void main(String[] args) { + System.out.println("⚡️ Uni from failure"); + + Uni.createFrom().failure(new IOException("Boom")) + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Uni.createFrom().failure(() -> new IOException("Badaboom")) + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_09_Uni_From_CompletionStage.java b/workshop-examples/src/main/java/_01_basics/_09_Uni_From_CompletionStage.java new file mode 100755 index 000000000..b17f1bb6d --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_09_Uni_From_CompletionStage.java @@ -0,0 +1,26 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.Uni; + +public class _09_Uni_From_CompletionStage { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Uni from CompletionStage"); + + var cs = CompletableFuture + .supplyAsync(() -> "Hello!", delayedExecutor(1, TimeUnit.SECONDS)) + .thenApply(String::toUpperCase); + + Uni.createFrom().completionStage(cs) + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Thread.sleep(2000); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_10_Uni_Misc.java b/workshop-examples/src/main/java/_01_basics/_10_Uni_Misc.java new file mode 100755 index 000000000..4a57ca8f0 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_10_Uni_Misc.java @@ -0,0 +1,30 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.Optional; + +import io.smallrye.mutiny.Uni; + +public class _10_Uni_Misc { + + public static void main(String[] args) { + System.out.println("⚡️ Misc"); + + Uni.createFrom().nothing() + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Uni.createFrom().voidItem() + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Uni.createFrom().nullItem() + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Uni.createFrom().optional(Optional.of("Hello")) + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + Uni.createFrom().converter(i -> Uni.createFrom().item("[" + i + "]"), 10) + .subscribe().with(System.out::println, failure -> System.out.println(failure.getMessage())); + + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_11_Uni_Delay.java b/workshop-examples/src/main/java/_01_basics/_11_Uni_Delay.java new file mode 100755 index 000000000..1f8a06552 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_11_Uni_Delay.java @@ -0,0 +1,32 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.Uni; + +public class _11_Uni_Delay { + + public static void main(String[] args) { + System.out.println("⚡️ Uni delay"); + + Uni.createFrom().item(666) + .onItem().delayIt().by(Duration.ofSeconds(1)) + .subscribe().with(System.out::println); + + System.out.println("⏰"); + + Uni.createFrom().item(666) + .onItem().delayIt() + .until(n -> Uni.createFrom().completionStage( + supplyAsync( + () -> "Ok", + delayedExecutor(5, TimeUnit.SECONDS)))) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_12_Uni_Disjoint.java b/workshop-examples/src/main/java/_01_basics/_12_Uni_Disjoint.java new file mode 100755 index 000000000..49a9f090e --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_12_Uni_Disjoint.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.List; + +import io.smallrye.mutiny.Uni; + +public class _12_Uni_Disjoint { + + public static void main(String[] args) { + System.out.println("⚡️ Uni disjoint"); + + Uni.createFrom().item(List.of(1, 2, 3, 4, 5)) + .onItem().disjoint() + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_13_Multi.java b/workshop-examples/src/main/java/_01_basics/_13_Multi.java new file mode 100755 index 000000000..2a9640d67 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_13_Multi.java @@ -0,0 +1,47 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.smallrye.mutiny.Multi; + +public class _13_Multi { + + public static void main(String[] args) { + System.out.println("⚡️ Hello world"); + + // -------------------------------------------------------------------------------------------------- // + + Multi.createFrom().items(1, 2, 3) + .subscribe().with( + subscription -> { + System.out.println("Subscription: " + subscription); + subscription.request(10); + }, + item -> System.out.println("Item: " + item), + failure -> System.out.println("Failure: " + failure.getMessage()), + () -> System.out.println("Completed")); + + // -------------------------------------------------------------------------------------------------- // + + System.out.println("----"); + + Multi.createFrom().range(10, 15) + .subscribe().with(System.out::println); + + var randomNumbers = Stream + .generate(ThreadLocalRandom.current()::nextInt) + .limit(5) + .collect(Collectors.toList()); + + // -------------------------------------------------------------------------------------------------- // + + System.out.println("----"); + + Multi.createFrom().iterable(randomNumbers) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_14_Multi_Subscriber.java b/workshop-examples/src/main/java/_01_basics/_14_Multi_Subscriber.java new file mode 100755 index 000000000..17ee6144f --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_14_Multi_Subscriber.java @@ -0,0 +1,42 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + +import io.smallrye.mutiny.Multi; + +public class _14_Multi_Subscriber { + + public static void main(String[] args) { + System.out.println("⚡️ Hello world with subscriber"); + + Multi.createFrom().items(1, 2, 3).subscribe().withSubscriber(new Subscriber() { + private Subscription subscription; + + @Override + public void onSubscribe(Subscription subscription) { + System.out.println("onSubscribe()"); + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + public void onNext(Integer integer) { + System.out.println("onNext: " + integer); + this.subscription.request(1); + } + + @Override + public void onError(Throwable t) { + System.out.println("onError: " + t.getMessage()); + } + + @Override + public void onComplete() { + System.out.println("onComplete()"); + } + }); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_15_Multi_From_Emitter.java b/workshop-examples/src/main/java/_01_basics/_15_Multi_From_Emitter.java new file mode 100755 index 000000000..ff8116070 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_15_Multi_From_Emitter.java @@ -0,0 +1,37 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.smallrye.mutiny.Multi; + +public class _15_Multi_From_Emitter { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi from emitter"); + + ScheduledExecutorService service = Executors.newScheduledThreadPool(1); + + AtomicReference> ref = new AtomicReference<>(); + AtomicInteger counter = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + + Multi.createFrom().emitter(emitter -> { + ref.set(service.scheduleAtFixedRate(() -> { + emitter.emit("tick"); + if (counter.getAndIncrement() == 5) { + ref.get().cancel(true); + emitter.complete(); + latch.countDown(); + } + }, 0, 500, TimeUnit.MILLISECONDS)); + }) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done!")); + + latch.await(); + service.shutdown(); + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_16_Multi_Control_Subscription.java b/workshop-examples/src/main/java/_01_basics/_16_Multi_Control_Subscription.java new file mode 100755 index 000000000..24cc70121 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_16_Multi_Control_Subscription.java @@ -0,0 +1,52 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.Flow.Subscription; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _16_Multi_Control_Subscription { + + public static void main(String[] args) { + System.out.println("⚡️ Multi and subscription"); + + Multi.createFrom() + .ticks().every(Duration.of(1, ChronoUnit.SECONDS)) + .subscribe().withSubscriber(new MultiSubscriber() { + + private Subscription subscription; + private int counter = 0; + + @Override + public void onItem(Long tick) { + System.out.println("Tick: " + tick); + if (counter++ == 10) { + subscription.cancel(); + } else { + subscription.request(1); + } + } + + @Override + public void onFailure(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompletion() { + System.out.println("Done!"); + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + }); + + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_17_Multi_By_Repeating.java b/workshop-examples/src/main/java/_01_basics/_17_Multi_By_Repeating.java new file mode 100755 index 000000000..dac2d5441 --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_17_Multi_By_Repeating.java @@ -0,0 +1,59 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _17_Multi_By_Repeating { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi by repeating"); + + Multi.createBy() + .repeating() + .supplier(Service::fetchValue) + .until(n -> n > 1_000_000L) + .subscribe().with(System.out::println); + + System.out.println("\n----\n"); + + CountDownLatch latch = new CountDownLatch(1); + + Multi.createBy() + .repeating() + .uni(Service::asyncFetchValue) + .atMost(10) + .subscribe().with(System.out::println, Throwable::printStackTrace, latch::countDown); + + latch.await(); + + System.out.println("\n----\n"); + + Multi.createBy() + .repeating() + .completionStage(Service::queryDb) + .whilst(n -> n < 1_000_000L) + .subscribe().with(System.out::println); + } + + static class Service { + + static long fetchValue() { + return ThreadLocalRandom.current().nextLong(1_001_000L); + } + + static Uni asyncFetchValue() { + return Uni.createFrom().completionStage(Service::queryDb); + } + + static CompletionStage queryDb() { + return CompletableFuture.supplyAsync(Service::fetchValue); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_18_Multi_From_Resource.java b/workshop-examples/src/main/java/_01_basics/_18_Multi_From_Resource.java new file mode 100755 index 000000000..19a43aa1b --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_18_Multi_From_Resource.java @@ -0,0 +1,29 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import io.smallrye.mutiny.Multi; + +public class _18_Multi_From_Resource { + + public static void main(String[] args) { + System.out.println("⚡️ Multi from resource"); + + Multi.createFrom() + .resource(MyResource::new, MyResource::stream) + .withFinalizer(MyResource::close) + .subscribe().with(System.out::println); + } + + static class MyResource { + + public Multi stream() { + System.out.println("stream()"); + return Multi.createFrom().range(0, 10); + } + + public void close() { + System.out.println("close()"); + } + } +} diff --git a/workshop-examples/src/main/java/_01_basics/_19_Multi_From_Generator.java b/workshop-examples/src/main/java/_01_basics/_19_Multi_From_Generator.java new file mode 100755 index 000000000..b6fad449f --- /dev/null +++ b/workshop-examples/src/main/java/_01_basics/_19_Multi_From_Generator.java @@ -0,0 +1,43 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _01_basics; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import io.smallrye.mutiny.Multi; + +public class _19_Multi_From_Generator { + + public static void main(String[] args) { + System.out.println("⚡️ Multi from generator"); + + Multi.createFrom() + .generator(MyState::new, (state, emitter) -> { + List list = state.produceorNull(); + if (list != null) { + emitter.emit(list); + } + return state; + }) + .select().first(10) + .subscribe().with(System.out::println); + } + + private static class MyState { + Random random = new Random(); + LinkedList list = new LinkedList<>(); + + List produceorNull() { + list.add(random.nextInt(10)); + if (list.size() > 3) { + list.remove(0); + return new ArrayList<>(list); + } else { + return null; + } + } + } +} diff --git a/workshop-examples/src/main/java/_02_groups/_01_Uni_Event_Groups.java b/workshop-examples/src/main/java/_02_groups/_01_Uni_Event_Groups.java new file mode 100755 index 000000000..34f1986e1 --- /dev/null +++ b/workshop-examples/src/main/java/_02_groups/_01_Uni_Event_Groups.java @@ -0,0 +1,27 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _02_groups; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import io.smallrye.mutiny.Uni; + +public class _01_Uni_Event_Groups { + + public static void main(String[] args) { + System.out.println("⚡️ Uni inspect events"); + + var result = Uni.createFrom().item("Hello") + .onSubscription().invoke(sub -> System.out.println("onSubscribe " + sub)) + .onCancellation().invoke(() -> System.out.println("onCancellation")) + .onItem().invoke(item -> System.out.println("onItem " + item)) + .onFailure().invoke(failure -> System.out.println("onFailure " + failure)) + .onItemOrFailure().invoke((item, failure) -> System.out.println("onItemOrFailure " + item + ", " + failure)) + .onTermination().invoke((s, t, c) -> System.out.println("onTermination " + s + ", " + t + ", " + c)) + .await().atMost(Duration.of(5, ChronoUnit.SECONDS)); + + System.out.println("📦 uni result = " + result); + System.out.println("⚠️ This was a blocking operation"); + } +} diff --git a/workshop-examples/src/main/java/_02_groups/_02_Uni_Call.java b/workshop-examples/src/main/java/_02_groups/_02_Uni_Call.java new file mode 100755 index 000000000..bfe3deb7a --- /dev/null +++ b/workshop-examples/src/main/java/_02_groups/_02_Uni_Call.java @@ -0,0 +1,33 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _02_groups; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; + +import io.smallrye.mutiny.Uni; + +public class _02_Uni_Call { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Uni call()"); + + CountDownLatch latch = new CountDownLatch(1); + + Uni.createFrom().item(123) + .onItem().call(n -> asyncLog(">>> ", n)) + .onTermination().call(() -> asyncLog("--- ", "")) + .subscribe().with(x -> { + System.out.println(x); + latch.countDown(); + }); + + latch.await(); + } + + static Uni asyncLog(String prefix, Object value) { + var cs = CompletableFuture + .runAsync(() -> System.out.println(Thread.currentThread() + " :: " + prefix + value)); + return Uni.createFrom().completionStage(cs); + } +} diff --git a/workshop-examples/src/main/java/_02_groups/_03_Uni_Shortcuts.java b/workshop-examples/src/main/java/_02_groups/_03_Uni_Shortcuts.java new file mode 100755 index 000000000..a5cd3cc21 --- /dev/null +++ b/workshop-examples/src/main/java/_02_groups/_03_Uni_Shortcuts.java @@ -0,0 +1,21 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _02_groups; + +import io.smallrye.mutiny.Uni; + +public class _03_Uni_Shortcuts { + + public static void main(String[] args) { + System.out.println("⚡️ Uni invoke / call shortcuts"); + + Uni.createFrom().item(123) + .invoke(n -> System.out.println("n = " + n)) + .call(n -> Uni.createFrom() + .voidItem() + .invoke(() -> System.out.println("call(" + n + ")"))) + .eventually(() -> System.out.println("eventually()")) + .subscribe().with(System.out::println); + + } +} diff --git a/workshop-examples/src/main/java/_02_groups/_04_Multi_Event_Groups.java b/workshop-examples/src/main/java/_02_groups/_04_Multi_Event_Groups.java new file mode 100755 index 000000000..1dc3a11c7 --- /dev/null +++ b/workshop-examples/src/main/java/_02_groups/_04_Multi_Event_Groups.java @@ -0,0 +1,28 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _02_groups; + +import java.util.stream.Collectors; + +import io.smallrye.mutiny.Multi; + +public class _04_Multi_Event_Groups { + + public static void main(String[] args) { + System.out.println("⚡️ Multi inspect events"); + + var items = Multi.createFrom().range(1, 6) + .onSubscription().invoke(sub -> System.out.println("onSubscribe " + sub)) + .onRequest().invoke(count -> System.out.println("onRequest " + count)) + .onCancellation().invoke(() -> System.out.println("onCancellation")) + .onItem().invoke(item -> System.out.println("onItem " + item)) + .onFailure().invoke(failure -> System.out.println("onFailure " + failure)) + .onCompletion().invoke(() -> System.out.println("onCompletion")) + .onTermination().invoke((t, c) -> System.out.println("onTermination " + t + ", c")) + .subscribe() + .asStream().collect(Collectors.toList()); + + System.out.println("📦 multi items = " + items); + System.out.println("⚠️ This was a blocking operation"); + } +} diff --git a/workshop-examples/src/main/java/_02_groups/_05_Multi_Shortcuts.java b/workshop-examples/src/main/java/_02_groups/_05_Multi_Shortcuts.java new file mode 100755 index 000000000..51873478d --- /dev/null +++ b/workshop-examples/src/main/java/_02_groups/_05_Multi_Shortcuts.java @@ -0,0 +1,19 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _02_groups; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _05_Multi_Shortcuts { + + public static void main(String[] args) { + System.out.println("⚡️ Multi invoke / call shortcuts"); + + Multi.createFrom().range(1, 10) + .invoke(n -> System.out.println("n = " + n)) + .call(n -> Uni.createFrom() + .voidItem().invoke(() -> System.out.println("call(" + n + ")"))) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_01_Uni_Transform.java b/workshop-examples/src/main/java/_03_composition_transformation/_01_Uni_Transform.java new file mode 100755 index 000000000..1aef9f951 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_01_Uni_Transform.java @@ -0,0 +1,17 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Uni; + +public class _01_Uni_Transform { + + public static void main(String[] args) { + System.out.println("⚡️ Uni transform a value with a function"); + + Uni.createFrom().item(123) + .onItem().transform(n -> n * 100) + .onItem().transform(Object::toString) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_02_Uni_TransformToUni.java b/workshop-examples/src/main/java/_03_composition_transformation/_02_Uni_TransformToUni.java new file mode 100755 index 000000000..cc5286eb4 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_02_Uni_TransformToUni.java @@ -0,0 +1,21 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Uni; + +public class _02_Uni_TransformToUni { + + public static void main(String[] args) { + System.out.println("⚡️ Uni transform a value with a Uni"); + + Uni.createFrom().item(123) + .onItem().transformToUni(n -> increase(n)) + .onItem().transformToUni((n, emitter) -> emitter.complete("[" + n + "]")) + .subscribe().with(System.out::println); + } + + static Uni increase(int n) { + return Uni.createFrom().item(n * 100); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_03_Uni_Shortcuts.java b/workshop-examples/src/main/java/_03_composition_transformation/_03_Uni_Shortcuts.java new file mode 100755 index 000000000..8e1241f6e --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_03_Uni_Shortcuts.java @@ -0,0 +1,22 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Uni; + +public class _03_Uni_Shortcuts { + + public static void main(String[] args) { + System.out.println("⚡️ Uni transform a value with shortcuts"); + + Uni.createFrom().item(123) + .replaceWith(Uni.createFrom().item(456)) + .chain(n -> increase(n)) // or flatMap + .map(n -> "[" + n + "]") + .subscribe().with(System.out::println); + } + + static Uni increase(int n) { + return Uni.createFrom().item(n * 100); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_04_Uni_Stage.java b/workshop-examples/src/main/java/_03_composition_transformation/_04_Uni_Stage.java new file mode 100755 index 000000000..baa2f2243 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_04_Uni_Stage.java @@ -0,0 +1,39 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import java.io.IOException; + +import io.smallrye.mutiny.Uni; + +public class _04_Uni_Stage { + + public static void main(String[] args) { + System.out.println("⚡️ Stages (with Uni)"); + + Uni.createFrom().item(123) + .stage(uni -> processEvents(uni)) + .stage(uni -> handleItem(uni)) + .stage(uni -> handleFailure(uni)) + .subscribe().with(System.out::println, Throwable::printStackTrace); + + } + + private static Uni handleFailure(Uni uni) { + return uni + .onFailure().invoke(failure -> System.out.println("There is a failure: " + failure.getMessage())) + .onFailure(IOException.class).recoverWithItem(-1); + } + + private static Uni handleItem(Uni uni) { + return uni + .onItem().ifNotNull().invoke(() -> System.out.println("The item is not null")) + .onItem().transform(n -> n * 10); + } + + private static Uni processEvents(Uni uni) { + return uni + .onSubscription().invoke(() -> System.out.println("onSubscribe")) + .onItem().invoke(n -> System.out.println("item = " + n)); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_05_Uni_Combine.java b/workshop-examples/src/main/java/_03_composition_transformation/_05_Uni_Combine.java new file mode 100755 index 000000000..f2168ac9c --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_05_Uni_Combine.java @@ -0,0 +1,30 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Uni; + +public class _05_Uni_Combine { + + public static void main(String[] args) { + System.out.println("⚡️ Uni combine"); + + var first = Uni.createFrom().item(1); + var second = Uni.createFrom().item(2); + var third = Uni.createFrom().item(3); + + Uni.combine() + .all().unis(first, second, third) + .asTuple() + .subscribe().with(System.out::println); + + Uni.combine() + .all().unis(first, second, third) + .combinedWith((a, b, c) -> a + b + c) + .subscribe().with(System.out::println); + + Uni.combine() + .any().of(first, second, third) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_06_Multi_Transform.java b/workshop-examples/src/main/java/_03_composition_transformation/_06_Multi_Transform.java new file mode 100755 index 000000000..a14fc5fe1 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_06_Multi_Transform.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Multi; + +public class _06_Multi_Transform { + + public static void main(String[] args) { + System.out.println("⚡️ Multi transformations"); + + Multi.createFrom().range(1, 100) + .select().where(n -> n % 2 == 0) + .select().last(5) + .onItem().transform(n -> "[" + n + "]") + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_07_Multi_TransformToUni.java b/workshop-examples/src/main/java/_03_composition_transformation/_07_Multi_TransformToUni.java new file mode 100755 index 000000000..76a7f7c37 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_07_Multi_TransformToUni.java @@ -0,0 +1,39 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _07_Multi_TransformToUni { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi transformations with Uni"); + + CountDownLatch latch = new CountDownLatch(1); + + Multi.createFrom().range(1, 100) + .select().where(n -> n % 2 == 0) + .select().last(5) + .onItem().transformToUniAndMerge(n -> increase(n)) // try transformToUniAndConcatenate + .onItem().transform(n -> "[" + n + "]") + .onCompletion().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + static Uni increase(int n) { + var cs = CompletableFuture.supplyAsync( + () -> n * 100, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS)); + return Uni.createFrom().completionStage(cs); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_08_Multi_TransformToMulti.java b/workshop-examples/src/main/java/_03_composition_transformation/_08_Multi_TransformToMulti.java new file mode 100755 index 000000000..f55db1690 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_08_Multi_TransformToMulti.java @@ -0,0 +1,44 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.runAsync; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.Multi; + +public class _08_Multi_TransformToMulti { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi transformations to Multi"); + + CountDownLatch latch = new CountDownLatch(1); + + Multi.createFrom().range(1, 100) + .select().where(n -> n % 2 == 0) + .select().last(5) + .onItem().transformToMultiAndMerge(n -> query(n)) // try transformToMultiAndConcatenate + .onItem().transform(n -> "[" + n + "]") + .onCompletion().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + static Multi query(int n) { + return Multi.createFrom().emitter(emitter -> { + runAsync( + () -> { + emitter.emit(n); + emitter.emit(n * 10); + emitter.emit(n * 100); + emitter.complete(); + }, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS)); + }); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_09_Multi_Shortcuts.java b/workshop-examples/src/main/java/_03_composition_transformation/_09_Multi_Shortcuts.java new file mode 100755 index 000000000..1450d9632 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_09_Multi_Shortcuts.java @@ -0,0 +1,44 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.runAsync; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.Multi; + +public class _09_Multi_Shortcuts { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi transformations to Multi with shortcuts"); + + CountDownLatch latch = new CountDownLatch(1); + + Multi.createFrom().range(1, 100) + .filter(n -> n % 2 == 0) + .select().last(5) + .flatMap(n -> query(n)) // try concatMap + .map(n -> "[" + n + "]") + .onCompletion().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + static Multi query(int n) { + return Multi.createFrom().emitter(emitter -> { + runAsync( + () -> { + emitter.emit(n); + emitter.emit(n * 10); + emitter.emit(n * 100); + emitter.complete(); + }, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS)); + }); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_10_Multi_Merge.java b/workshop-examples/src/main/java/_03_composition_transformation/_10_Multi_Merge.java new file mode 100755 index 000000000..2c1877698 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_10_Multi_Merge.java @@ -0,0 +1,52 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _10_Multi_Merge { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi merge"); + + var firstGenerator = new Generator(0); + var secondGenerator = new Generator(100); + CountDownLatch latch = new CountDownLatch(1); + + var first = Multi.createBy().repeating() + .uni(firstGenerator::next).atMost(10); + + var second = Multi.createBy().repeating() + .uni(secondGenerator::next).atMost(10); + + Multi.createBy().merging().streams(first, second) + .onTermination().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + private static class Generator { + final AtomicLong aLong; + + Generator(long start) { + aLong = new AtomicLong(start); + } + + Uni next() { + return Uni.createFrom().completionStage( + supplyAsync( + aLong::getAndIncrement, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS))); + } + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_11_Multi_Concatenate.java b/workshop-examples/src/main/java/_03_composition_transformation/_11_Multi_Concatenate.java new file mode 100755 index 000000000..4f9e30c1d --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_11_Multi_Concatenate.java @@ -0,0 +1,52 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _11_Multi_Concatenate { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi concatenation"); + + var firstGenerator = new Generator(0); + var secondGenerator = new Generator(100); + CountDownLatch latch = new CountDownLatch(1); + + var first = Multi.createBy().repeating() + .uni(firstGenerator::next).atMost(10); + + var second = Multi.createBy().repeating() + .uni(secondGenerator::next).atMost(10); + + Multi.createBy().concatenating().streams(first, second) + .onTermination().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + private static class Generator { + final AtomicLong aLong; + + Generator(long start) { + aLong = new AtomicLong(start); + } + + Uni next() { + return Uni.createFrom().completionStage( + supplyAsync( + aLong::getAndIncrement, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS))); + } + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_12_Multi_Combine.java b/workshop-examples/src/main/java/_03_composition_transformation/_12_Multi_Combine.java new file mode 100755 index 000000000..74f9cde14 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_12_Multi_Combine.java @@ -0,0 +1,53 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _12_Multi_Combine { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi combination"); + + var firstGenerator = new Generator(0); + var secondGenerator = new Generator(100); + CountDownLatch latch = new CountDownLatch(1); + + var first = Multi.createBy().repeating() + .uni(firstGenerator::next).atMost(10); + + var second = Multi.createBy().repeating() + .uni(secondGenerator::next).atMost(10); + + Multi.createBy().combining().streams(first, second) + .asTuple() // also try lastItems() + .onTermination().invoke(latch::countDown) + .subscribe().with(System.out::println); + + latch.await(); + } + + private static class Generator { + final AtomicLong aLong; + + Generator(long start) { + aLong = new AtomicLong(start); + } + + Uni next() { + return Uni.createFrom().completionStage( + supplyAsync( + aLong::getAndIncrement, + delayedExecutor(ThreadLocalRandom.current().nextInt(500), TimeUnit.MILLISECONDS))); + } + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_13_Multi_Broadcast.java b/workshop-examples/src/main/java/_03_composition_transformation/_13_Multi_Broadcast.java new file mode 100755 index 000000000..544a97dfb --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_13_Multi_Broadcast.java @@ -0,0 +1,38 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Multi; + +public class _13_Multi_Broadcast { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi broadcast"); + + var counter = new AtomicInteger(); + var executor = Executors.newCachedThreadPool(); + + var multi = Multi.createBy() + .repeating().supplier(counter::getAndIncrement) + .atMost(10) + .broadcast().toAllSubscribers(); + + executor.submit(() -> multi + .onItem().transform(n -> "🚀 " + n) + .subscribe().with(System.out::println)); + + executor.submit(() -> multi + .onItem().transform(n -> "🧪 " + n) + .subscribe().with(System.out::println)); + + executor.submit(() -> multi + .onItem().transform(n -> "💡 " + n) + .subscribe().with(System.out::println)); + + executor.awaitTermination(10, TimeUnit.SECONDS); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_14_Multi_Aggregates.java b/workshop-examples/src/main/java/_03_composition_transformation/_14_Multi_Aggregates.java new file mode 100755 index 000000000..f66e5a8fb --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_14_Multi_Aggregates.java @@ -0,0 +1,111 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _14_Multi_Aggregates { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi aggregates"); + + // ------------------------------------------------------------------ // + + var persons = Multi.createBy().repeating() + .supplier(() -> generate()).atMost(100); + + // ------------------------------------------------------------------ // + + System.out.println(); + + persons + .onItem().scan(() -> 0, (count, next) -> count + 1) + .subscribe().with(count -> System.out.println("We have " + count + " persons")); + + // ------------------------------------------------------------------ // + + System.out.println(); + + persons + .collect().with(Collectors.counting()) + .subscribe().with(count -> System.out.println("We have " + count + " persons")); + + // ------------------------------------------------------------------ // + + System.out.println(); + + persons + .select().where(person -> person.city.equals("Nevers")) + .collect().asList() + .subscribe().with(list -> System.out.println("They live in Nevers: " + list)); + + // ------------------------------------------------------------------ // + + System.out.println(); + + persons + .select().where(person -> person.city.equals("Nevers")) + .collect().in( + StringBuilder::new, + (acc, next) -> acc.append("\n") + .append(" -> ") + .append(next.identifier)) + .subscribe().with(list -> System.out.println("They live in Nevers: " + list)); + + // ------------------------------------------------------------------ // + + System.out.println(); + + var agePerCity = persons + .group().by(person -> person.city) + .onItem().transformToUni(group -> { + String city = group.key(); + Uni avg = group.collect().with(Collectors.averagingInt(person -> person.age)); + return avg.onItem().transform(res -> "Average age in " + city + " is " + res); + }).merge(); + + agePerCity.subscribe().with(System.out::println); + } + + // ------------------------------------------------------------------ // + + static Person generate() { + var rand = ThreadLocalRandom.current(); + return new Person( + UUID.randomUUID().toString(), + rand.nextInt(18, 50), + cities.get(rand.nextInt(cities.size()))); + } + + static List cities = Arrays.asList("Lyon", "Tassin La Demi Lune", "Clermont-Ferrand", "Nevers"); + + // ------------------------------------------------------------------ // + + private static class Person { + final String identifier; + final int age; + final String city; + + Person(String identifier, int age, String city) { + this.identifier = identifier; + this.age = age; + this.city = city; + } + + @Override + public String toString() { + return "Person{" + + "identifier='" + identifier + '\'' + + ", age=" + age + + ", city='" + city + '\'' + + '}'; + } + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_15_Multi_Buckets.java b/workshop-examples/src/main/java/_03_composition_transformation/_15_Multi_Buckets.java new file mode 100755 index 000000000..79d65548d --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_15_Multi_Buckets.java @@ -0,0 +1,19 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import java.time.Duration; + +import io.smallrye.mutiny.Multi; + +public class _15_Multi_Buckets { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi buckets"); + + Multi.createFrom() + .ticks().every(Duration.ofMillis(200)) + .group().intoLists().of(5) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_16_Multi_Temporal_Buckets.java b/workshop-examples/src/main/java/_03_composition_transformation/_16_Multi_Temporal_Buckets.java new file mode 100755 index 000000000..5f7e7d86b --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_16_Multi_Temporal_Buckets.java @@ -0,0 +1,19 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import java.time.Duration; + +import io.smallrye.mutiny.Multi; + +public class _16_Multi_Temporal_Buckets { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi temporal buckets"); + + Multi.createFrom() + .ticks().every(Duration.ofMillis(200)) + .group().intoLists().every(Duration.ofSeconds(2)) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_17_Multi_Disjoint.java b/workshop-examples/src/main/java/_03_composition_transformation/_17_Multi_Disjoint.java new file mode 100755 index 000000000..9c32bb9df --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_17_Multi_Disjoint.java @@ -0,0 +1,19 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Multi; + +public class _17_Multi_Disjoint { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi disjoint items"); + + Multi.createFrom().range(0, 10) + .onItem().transformToMultiAndMerge(n -> Multi.createFrom().items(n, n * 2, n, n * 5, n, n * 10)) + .group().intoLists().of(3) + .onItem().invoke(list -> System.out.println(">>> " + list)) + .onItem().disjoint() + .subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_18_Multi_To_Uni_And_Back.java b/workshop-examples/src/main/java/_03_composition_transformation/_18_Multi_To_Uni_And_Back.java new file mode 100755 index 000000000..9b7999f30 --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_18_Multi_To_Uni_And_Back.java @@ -0,0 +1,21 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _18_Multi_To_Uni_And_Back { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi <-> Uni"); + + Multi.createFrom().range(1, 10) + .toUni() + .subscribe().with(System.out::println); + + Uni.createFrom().item(123) + .toMulti() + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_03_composition_transformation/_19_Multi_Select.java b/workshop-examples/src/main/java/_03_composition_transformation/_19_Multi_Select.java new file mode 100755 index 000000000..d83890f1b --- /dev/null +++ b/workshop-examples/src/main/java/_03_composition_transformation/_19_Multi_Select.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _03_composition_transformation; + +import io.smallrye.mutiny.Multi; + +public class _19_Multi_Select { + + public static void main(String[] args) throws InterruptedException { + System.out.println("⚡️ Multi select"); + + Multi.createFrom().range(1, 100) + .skip().first(10) + .select().where(n -> n % 2 == 0) + .select().last(10) + .subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_01_Uni_Failure_Transform.java b/workshop-examples/src/main/java/_04_failures/_01_Uni_Failure_Transform.java new file mode 100755 index 000000000..d6e0c0c50 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_01_Uni_Failure_Transform.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; + +import io.smallrye.mutiny.Uni; + +public class _01_Uni_Failure_Transform { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure transformation"); + + Uni.createFrom().failure(new IOException("Boom")) + .onFailure(IOException.class).transform(RuntimeException::new) + .subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_02_Uni_Failure_Recover_With_Item.java b/workshop-examples/src/main/java/_04_failures/_02_Uni_Failure_Recover_With_Item.java new file mode 100755 index 000000000..379a0dca0 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_02_Uni_Failure_Recover_With_Item.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; + +import io.smallrye.mutiny.Uni; + +public class _02_Uni_Failure_Recover_With_Item { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure recover with item"); + + Uni.createFrom().failure(new IOException("Boom")) + .onFailure(IOException.class).recoverWithItem(Throwable::getMessage) + .subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_03_Uni_Failure_Recover_With_Uni.java b/workshop-examples/src/main/java/_04_failures/_03_Uni_Failure_Recover_With_Uni.java new file mode 100755 index 000000000..2825f7ca1 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_03_Uni_Failure_Recover_With_Uni.java @@ -0,0 +1,18 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; + +import io.smallrye.mutiny.Uni; + +public class _03_Uni_Failure_Recover_With_Uni { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure recover with Uni"); + + Uni.createFrom().failure(new IOException("Boom")) + .onFailure(IOException.class).recoverWithUni(t -> Uni.createFrom().item("N/A -> " + t.getMessage())) + .subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_04_Uni_Failure_Retry.java b/workshop-examples/src/main/java/_04_failures/_04_Uni_Failure_Retry.java new file mode 100755 index 000000000..772df6a0a --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_04_Uni_Failure_Retry.java @@ -0,0 +1,28 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniEmitter; + +public class _04_Uni_Failure_Retry { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure retry"); + + Uni.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().invoke(() -> System.out.println("Failed")) + .onFailure().retry().indefinitely() + .subscribe().with(System.out::println); + } + + private static void generate(UniEmitter emitter) { + if (ThreadLocalRandom.current().nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.complete("Ok"); + } else { + emitter.fail(new RuntimeException("Boom")); + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_05_Uni_Failure_Retry_Bounded.java b/workshop-examples/src/main/java/_04_failures/_05_Uni_Failure_Retry_Bounded.java new file mode 100755 index 000000000..9b0342330 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_05_Uni_Failure_Retry_Bounded.java @@ -0,0 +1,28 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniEmitter; + +public class _05_Uni_Failure_Retry_Bounded { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure retry"); + + Uni.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().invoke(() -> System.out.println("Failed")) + .onFailure().retry().atMost(10) + .subscribe().with(System.out::println, Throwable::printStackTrace); + } + + private static void generate(UniEmitter emitter) { + if (ThreadLocalRandom.current().nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.complete("Ok"); + } else { + emitter.fail(new RuntimeException("Boom")); + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_06_Uni_Failure_Retry_ExpBackoff.java b/workshop-examples/src/main/java/_04_failures/_06_Uni_Failure_Retry_ExpBackoff.java new file mode 100755 index 000000000..6eabbc1bd --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_06_Uni_Failure_Retry_ExpBackoff.java @@ -0,0 +1,31 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniEmitter; + +public class _06_Uni_Failure_Retry_ExpBackoff { + + public static void main(String[] args) { + System.out.println("⚡️ Uni failure retry"); + + long start = System.currentTimeMillis(); + + Uni.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().invoke(() -> System.out.println("Failed after " + (System.currentTimeMillis() - start) + "ms")) + .onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofMillis(1000)).expireIn(10_000) + .subscribe().with(System.out::println, Throwable::printStackTrace); + } + + private static void generate(UniEmitter emitter) { + if (ThreadLocalRandom.current().nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.complete("Ok"); + } else { + emitter.fail(new RuntimeException("Boom")); + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_07_Multi_Failure_Recover_Completing.java b/workshop-examples/src/main/java/_04_failures/_07_Multi_Failure_Recover_Completing.java new file mode 100755 index 000000000..b4655a609 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_07_Multi_Failure_Recover_Completing.java @@ -0,0 +1,32 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiEmitter; + +public class _07_Multi_Failure_Recover_Completing { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure recover by completing"); + + Multi.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().recoverWithCompletion() + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } + + private static void generate(MultiEmitter emitter) { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (true) { + if (random.nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.fail(new IOException("Boom")); + return; + } else { + emitter.emit(random.nextInt(0, 100)); + } + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_08_Multi_Failure_Recover_With_Item.java b/workshop-examples/src/main/java/_04_failures/_08_Multi_Failure_Recover_With_Item.java new file mode 100755 index 000000000..15335b3b5 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_08_Multi_Failure_Recover_With_Item.java @@ -0,0 +1,32 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiEmitter; + +public class _08_Multi_Failure_Recover_With_Item { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure recover with item"); + + Multi.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().recoverWithItem(() -> 666) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } + + private static void generate(MultiEmitter emitter) { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (true) { + if (random.nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.fail(new IOException("Boom")); + return; + } else { + emitter.emit(random.nextInt(0, 100)); + } + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_09_Multi_Failure_Recover_With_Multi.java b/workshop-examples/src/main/java/_04_failures/_09_Multi_Failure_Recover_With_Multi.java new file mode 100755 index 000000000..9c0b529bd --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_09_Multi_Failure_Recover_With_Multi.java @@ -0,0 +1,32 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiEmitter; + +public class _09_Multi_Failure_Recover_With_Multi { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure recover with Multi"); + + Multi.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().recoverWithMulti(() -> Multi.createFrom().items(666, 999)) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } + + private static void generate(MultiEmitter emitter) { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (true) { + if (random.nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.fail(new IOException("Boom")); + return; + } else { + emitter.emit(random.nextInt(0, 100)); + } + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_10_Multi_Failure_Retry.java b/workshop-examples/src/main/java/_04_failures/_10_Multi_Failure_Retry.java new file mode 100755 index 000000000..ad7bd3fa9 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_10_Multi_Failure_Retry.java @@ -0,0 +1,33 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiEmitter; + +public class _10_Multi_Failure_Retry { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure retry"); + + Multi.createFrom().emitter(emitter -> generate(emitter)) + .onFailure().invoke(() -> System.out.println("💥")) + .onFailure().retry().atMost(5) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } + + private static void generate(MultiEmitter emitter) { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (true) { + if (random.nextDouble(0.0d, 1.0d) < 0.05d) { + emitter.fail(new IOException("Boom")); + return; + } else { + emitter.emit(random.nextInt(0, 100)); + } + } + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_11_Multi_Failure_Cancelling_Recovery.java b/workshop-examples/src/main/java/_04_failures/_11_Multi_Failure_Cancelling_Recovery.java new file mode 100755 index 000000000..d2b4e951b --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_11_Multi_Failure_Cancelling_Recovery.java @@ -0,0 +1,23 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import io.smallrye.mutiny.Multi; + +public class _11_Multi_Failure_Cancelling_Recovery { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure that cancels after recovery"); + + Multi.createFrom().range(0, 10) + .onItem().invoke(n -> { + if (n == 6) { + throw new RuntimeException("Bada Boom"); + } else { + System.out.println(n + " 👍"); + } + }) + .onFailure().recoverWithItem(6) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } +} diff --git a/workshop-examples/src/main/java/_04_failures/_12_Multi_Failure_Guarded_Recovery.java b/workshop-examples/src/main/java/_04_failures/_12_Multi_Failure_Guarded_Recovery.java new file mode 100755 index 000000000..8c9d2d182 --- /dev/null +++ b/workshop-examples/src/main/java/_04_failures/_12_Multi_Failure_Guarded_Recovery.java @@ -0,0 +1,31 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _04_failures; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _12_Multi_Failure_Guarded_Recovery { + + public static void main(String[] args) { + System.out.println("⚡️ Multi failure that does not cancel after recovery"); + + Multi.createFrom().range(0, 10) + .onItem().transformToUniAndConcatenate(i -> safeGuardedOperation(i)) + .onFailure().recoverWithItem(6) + .subscribe().with(System.out::println, Throwable::printStackTrace, () -> System.out.println("✅")); + } + + private static Uni safeGuardedOperation(Integer i) { + return Uni + .createFrom().item(i) + .onItem().invoke(n -> { + if (n == 6) { + throw new RuntimeException("Bada Boom"); + } else { + System.out.println(n + " 👍"); + } + }) + .onFailure().recoverWithItem(i); + } +} diff --git a/workshop-examples/src/main/java/_05_backpressure/_01_Drop.java b/workshop-examples/src/main/java/_05_backpressure/_01_Drop.java new file mode 100755 index 000000000..1477fb038 --- /dev/null +++ b/workshop-examples/src/main/java/_05_backpressure/_01_Drop.java @@ -0,0 +1,54 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _05_backpressure; + +import java.util.concurrent.Flow.Subscription; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _01_Drop { + + public static void main(String[] args) { + System.out.println("⚡️ Back-pressure: drop"); + + Multi.createFrom().emitter(emitter -> emitTooFast(emitter), BackPressureStrategy.ERROR) + .onOverflow().invoke(s -> System.out.print("🚨 ")).drop() // Comment out for some fun + .subscribe().withSubscriber(new MultiSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(5); + } + + @Override + public void onItem(Object s) { + System.out.print(s + " "); + } + + @Override + public void onFailure(Throwable throwable) { + System.out.println("\n✋ " + throwable.getMessage()); + } + + @Override + public void onCompletion() { + System.out.println("\n✅"); + } + }); + } + + private static void emitTooFast(MultiEmitter emitter) { + new Thread(() -> { + while (true) { + emitter.emit("📦"); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + } +} diff --git a/workshop-examples/src/main/java/_05_backpressure/_02_Buffer.java b/workshop-examples/src/main/java/_05_backpressure/_02_Buffer.java new file mode 100755 index 000000000..7bff73b3f --- /dev/null +++ b/workshop-examples/src/main/java/_05_backpressure/_02_Buffer.java @@ -0,0 +1,54 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _05_backpressure; + +import java.util.concurrent.Flow.Subscription; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _02_Buffer { + + public static void main(String[] args) { + System.out.println("⚡️ Back-pressure: buffer"); + + Multi.createFrom().emitter(emitter -> emitTooFast(emitter), BackPressureStrategy.ERROR) + .onOverflow().buffer(32) + .subscribe().withSubscriber(new MultiSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(5); + } + + @Override + public void onItem(Object s) { + System.out.print(s + " "); + } + + @Override + public void onFailure(Throwable throwable) { + System.out.println("\n✋ " + throwable.getMessage()); + } + + @Override + public void onCompletion() { + System.out.println("\n✅"); + } + }); + } + + private static void emitTooFast(MultiEmitter emitter) { + new Thread(() -> { + while (true) { + emitter.emit("📦"); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + } +} diff --git a/workshop-examples/src/main/java/_05_backpressure/_03_Visual_Drop.java b/workshop-examples/src/main/java/_05_backpressure/_03_Visual_Drop.java new file mode 100755 index 000000000..8c0269e33 --- /dev/null +++ b/workshop-examples/src/main/java/_05_backpressure/_03_Visual_Drop.java @@ -0,0 +1,72 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _05_backpressure; + +import java.util.concurrent.Flow.Subscription; +import java.util.function.Consumer; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _03_Visual_Drop { + + public static void main(String[] args) { + System.out.println("⚡️ Back-pressure: drops visualised"); + + Multi.createFrom().emitter(emitter -> emitTooFast(emitter), BackPressureStrategy.ERROR) + .onItem().invoke((Consumer) System.out::println) + .onOverflow().dropPreviousItems() // also compare with .drop() + .subscribe().withSubscriber(new MultiSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(5); + periodicallyRequest(s); + } + + private void periodicallyRequest(Subscription s) { + new Thread(() -> { + while (true) { + try { + Thread.sleep(5_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(" 🤷 request"); + s.request(2); + } + }).start(); + } + + @Override + public void onItem(Object s) { + System.out.println(" ➡️ " + s); + } + + @Override + public void onFailure(Throwable throwable) { + System.out.println("✋ " + throwable.getMessage()); + } + + @Override + public void onCompletion() { + System.out.println("✅"); + } + }); + } + + private static void emitTooFast(MultiEmitter emitter) { + new Thread(() -> { + long n = 0; + while (true) { + emitter.emit("📦 " + ++n); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + } +} diff --git a/workshop-examples/src/main/java/_05_backpressure/_04_Visual_Buffer.java b/workshop-examples/src/main/java/_05_backpressure/_04_Visual_Buffer.java new file mode 100755 index 000000000..f9c85bf7c --- /dev/null +++ b/workshop-examples/src/main/java/_05_backpressure/_04_Visual_Buffer.java @@ -0,0 +1,72 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _05_backpressure; + +import java.util.concurrent.Flow.Subscription; +import java.util.function.Consumer; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _04_Visual_Buffer { + + public static void main(String[] args) { + System.out.println("⚡️ Back-pressure: buffering visualised"); + + Multi.createFrom().emitter(emitter -> emitTooFast(emitter), BackPressureStrategy.ERROR) + .onItem().invoke((Consumer) System.out::println) + .onOverflow().buffer(64) + .subscribe().withSubscriber(new MultiSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(5); + periodicallyRequest(s); + } + + private void periodicallyRequest(Subscription s) { + new Thread(() -> { + while (true) { + try { + Thread.sleep(5_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(" 🤷 request"); + s.request(2); + } + }).start(); + } + + @Override + public void onItem(Object s) { + System.out.println(" ➡️ " + s); + } + + @Override + public void onFailure(Throwable throwable) { + System.out.println("✋ " + throwable.getMessage()); + } + + @Override + public void onCompletion() { + System.out.println("✅"); + } + }); + } + + private static void emitTooFast(MultiEmitter emitter) { + new Thread(() -> { + long n = 0; + while (true) { + emitter.emit("📦 " + ++n); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + } +} diff --git a/workshop-examples/src/main/java/_06_threading/_01_Threading_Subscription.java b/workshop-examples/src/main/java/_06_threading/_01_Threading_Subscription.java new file mode 100755 index 000000000..4e1be4807 --- /dev/null +++ b/workshop-examples/src/main/java/_06_threading/_01_Threading_Subscription.java @@ -0,0 +1,35 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _06_threading; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _01_Threading_Subscription { + + public static void main(String[] args) { + System.out.println("⚡️ runSubscriptionOn (do not block the subscriber)"); + + var service = Executors.newFixedThreadPool(4); + + Multi.createBy().repeating().uni(() -> generate()).indefinitely() + .runSubscriptionOn(service) + .subscribe().with(item -> System.out.println(Thread.currentThread().getName() + " :: " + item)); + } + + static final AtomicInteger counter = new AtomicInteger(); + + static Uni generate() { + return Uni.createFrom().completionStage( + supplyAsync(counter::getAndIncrement, + delayedExecutor(ThreadLocalRandom.current().nextInt(1000), TimeUnit.MILLISECONDS))); + } +} diff --git a/workshop-examples/src/main/java/_06_threading/_02_Threading_Emit.java b/workshop-examples/src/main/java/_06_threading/_02_Threading_Emit.java new file mode 100755 index 000000000..b8ca9510b --- /dev/null +++ b/workshop-examples/src/main/java/_06_threading/_02_Threading_Emit.java @@ -0,0 +1,35 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _06_threading; + +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +public class _02_Threading_Emit { + + public static void main(String[] args) { + System.out.println("⚡️ emitOn (dispatch blocking event processing)"); + + var service = Executors.newFixedThreadPool(4); + + Multi.createBy().repeating().uni(() -> generate()).indefinitely() + .emitOn(service) + .subscribe().with(item -> System.out.println(Thread.currentThread().getName() + " :: " + item)); + } + + static final AtomicInteger counter = new AtomicInteger(); + + static Uni generate() { + return Uni.createFrom().completionStage( + supplyAsync(counter::getAndIncrement, + delayedExecutor(ThreadLocalRandom.current().nextInt(1000), TimeUnit.MILLISECONDS))); + } +} diff --git a/workshop-examples/src/main/java/_06_threading/_03_Infra_Executor.java b/workshop-examples/src/main/java/_06_threading/_03_Infra_Executor.java new file mode 100755 index 000000000..c7e24b3f0 --- /dev/null +++ b/workshop-examples/src/main/java/_06_threading/_03_Infra_Executor.java @@ -0,0 +1,32 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _06_threading; + +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +public class _03_Infra_Executor { + + public static void main(String[] args) { + System.out.println("⚡️ emitOn (dispatch blocking event processing to the Mutiny default worker pool)"); + + var service = Executors.newFixedThreadPool(4); + + Multi.createBy().repeating().uni(() -> generate()).indefinitely() + .emitOn(service) + .subscribe().with(item -> System.out.println(Thread.currentThread().getName() + " :: " + item)); + } + + static final AtomicInteger counter = new AtomicInteger(); + + static Uni generate() { + return Uni.createFrom().completionStage( + supplyAsync(counter::getAndIncrement, Infrastructure.getDefaultWorkerPool())); + } +} diff --git a/workshop-examples/src/main/java/_06_threading/_04_Threading_Blocking.java b/workshop-examples/src/main/java/_06_threading/_04_Threading_Blocking.java new file mode 100755 index 000000000..c713c78bc --- /dev/null +++ b/workshop-examples/src/main/java/_06_threading/_04_Threading_Blocking.java @@ -0,0 +1,28 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _06_threading; + +import java.util.stream.Collectors; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.BlockingIterable; + +public class _04_Threading_Blocking { + + public static void main(String[] args) { + System.out.println("⚡️ blocking"); + + BlockingIterable iterable = Multi.createFrom().range(0, 10) + .subscribe().asIterable(); + + var list = iterable.stream().collect(Collectors.toList()); + + System.out.println(list); + + Integer someInt = Uni.createFrom().item(123) + .await().indefinitely(); + + System.out.println(someInt); + } +} diff --git a/workshop-examples/src/main/java/_06_threading/_05_Threading_Blocking_Check.java b/workshop-examples/src/main/java/_06_threading/_05_Threading_Blocking_Check.java new file mode 100755 index 000000000..6b340a55d --- /dev/null +++ b/workshop-examples/src/main/java/_06_threading/_05_Threading_Blocking_Check.java @@ -0,0 +1,35 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _06_threading; + +import java.util.stream.Collectors; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.BlockingIterable; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +public class _05_Threading_Blocking_Check { + + public static void main(String[] args) { + System.out.println("⚡️ blocking"); + + Infrastructure.setCanCallerThreadBeBlockedSupplier(() -> !Thread.currentThread().getName().contains("yolo")); + + new Thread(() -> { + BlockingIterable iterable = Multi.createFrom().range(0, 10) + .subscribe().asIterable(); + + var list = iterable.stream().collect(Collectors.toList()); + + System.out.println(list); + }, "yolo-1").start(); + + new Thread(() -> { + Integer someInt = Uni.createFrom().item(123) + .await().indefinitely(); + + System.out.println(someInt); + }, "yolo-2").start(); + } +} diff --git a/workshop-examples/src/main/java/_07_misc/_01_Multi_Custom_Operator.java b/workshop-examples/src/main/java/_07_misc/_01_Multi_Custom_Operator.java new file mode 100755 index 000000000..5fd5efc0f --- /dev/null +++ b/workshop-examples/src/main/java/_07_misc/_01_Multi_Custom_Operator.java @@ -0,0 +1,45 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _07_misc; + +import java.util.concurrent.ThreadLocalRandom; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.AbstractMultiOperator; +import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class _01_Multi_Custom_Operator { + + public static void main(String[] args) { + System.out.println("⚡️ Custom operator, randomly drop items"); + + Multi.createFrom().range(1, 20) + .plug(RandomDrop::new) + .subscribe().with(System.out::println); + } + + static class RandomDrop extends AbstractMultiOperator { + public RandomDrop(Multi upstream) { + super(upstream); + } + + @Override + public void subscribe(MultiSubscriber downstream) { + upstream.subscribe().withSubscriber(new DropProcessor(downstream)); + } + + private class DropProcessor extends MultiOperatorProcessor { + DropProcessor(MultiSubscriber downstream) { + super(downstream); + } + + @Override + public void onItem(T item) { + if (ThreadLocalRandom.current().nextBoolean()) { + super.onItem(item); + } + } + } + } +} diff --git a/workshop-examples/src/main/java/_07_misc/_02_Logging.java b/workshop-examples/src/main/java/_07_misc/_02_Logging.java new file mode 100755 index 000000000..be49e4650 --- /dev/null +++ b/workshop-examples/src/main/java/_07_misc/_02_Logging.java @@ -0,0 +1,25 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _07_misc; + +import io.smallrye.mutiny.Multi; + +public class _02_Logging { + + public static void main(String[] args) { + System.out.println("⚡️ Logging"); + + Multi multi = Multi.createFrom().range(1, 3) + .log("source") + .onItem().transform(i -> ">>> " + i) + .log("transformed"); + + System.out.println(); + System.out.println("🚀 First subscriber"); + multi.subscribe().with(System.out::println); + + System.out.println(); + System.out.println("🚀 Second subscriber"); + multi.subscribe().with(System.out::println); + } +} diff --git a/workshop-examples/src/main/java/_07_misc/_03_Context.java b/workshop-examples/src/main/java/_07_misc/_03_Context.java new file mode 100755 index 000000000..8ad80250a --- /dev/null +++ b/workshop-examples/src/main/java/_07_misc/_03_Context.java @@ -0,0 +1,27 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS io.smallrye.reactive:mutiny:2.4.0 +package _07_misc; + +import java.util.List; + +import io.smallrye.mutiny.Context; +import io.smallrye.mutiny.Multi; + +public class _03_Context { + + public static void main(String[] args) { + System.out.println("⚡️ Using subscription-bound contexts"); + + Context context = Context.of("foo", 123, "bar", "abc-123-def"); + + List list = Multi.createFrom().range(1, 10) + .withContext((multi, ctx) -> multi.onItem().invoke(n -> ctx.put("n", n))) + .select().where(n -> n % 2 == 0) + .withContext( + (multi, ctx) -> multi.onItem().transform(n -> n + "::" + ctx.get("n") + " @foo -> " + ctx.get("foo"))) + .collect().asList() + .awaitUsing(context).indefinitely(); + + System.out.println(list); + } +}