Skip to content

Commit

Permalink
docs: document how to express branching in a pipeline
Browse files Browse the repository at this point in the history
Fixes: #1367
Co-authored-by: Clement Escoffier <[email protected]>
  • Loading branch information
jponge and cescoffier committed Sep 17, 2023
1 parent ae6e097 commit 4753a01
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 0 deletions.
53 changes: 53 additions & 0 deletions documentation/docs/guides/branching.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
tags:
- guide
- intermediate
---

# How to do branching in a reactive pipeline?

Mutiny and similar reactive programming libraries do not have _branching_ operators similar to `if / else` and `switch/case` statements in Java.

This does not mean that we can't express _branching_ in a reactive pipeline, and the most classic way is to use a transformation to a `Uni` (also called `flatMap` in functional programming).

## Expressing branches as Uni operations

Suppose that we have a pipeline where a `Uni` is created from a random value, and suppose that we want to have a different processing pipeline depending on whether the value is odd or even.
Let's have these 2 `Uni`-returning methods to model different behaviors:

```java linenums="1"
{{ insert('java/guides/BranchingTest.java', 'branches') }}
```

We can use the `transformToUni` operator to plug either method depending on the random number:

```java linenums="1"
{{ insert('java/guides/BranchingTest.java', 'pipeline') }}
```

Having such a mapping function is a common pattern: it has conditional logic and each branch returns a `Uni` that represents the "sub-pipeline" of what each branch shall do.

Note that such constructs are primarily relevant when asynchronous I/O are involved and that such asynchronous I/O operations are typically `Uni`-returning methods such as those found in the [Mutiny Vert.x bindings](https://smallrye.io/smallrye-mutiny-vertx-bindings/).

!!! tip

There are other ways to express the "result" of a branch.
You could wrap results in a custom type or a container like `java.util.Optional`.

You could also return a failed `Uni`, and later react by continuing with another `Uni`, another value, or retrying (which would model a loop!).

## Branching in a Multi

The case of `Multi` is even more interesting because a `null`-completed `Uni` is discarded from the stream by any of the `transformToUni{...}` methods:

```java linenums="1"
{{ insert('java/guides/BranchingTest.java', 'multi-pipeline') }}
```

where `drop()` is as follows:

```java linenums="1"
{{ insert('java/guides/BranchingTest.java', 'drop') }}
```

Any negative value is discarded in this `Multi` pipeline, while the positive even and odd numbers get forwarded to the subscriber.
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ nav:
- 'guides/replaying-multis.md'
- 'guides/controlling-demand.md'
- 'guides/multi-split.md'
- 'guides/branching.md'
- 'Reference':
- 'reference/migrating-to-mutiny-2.md'
- 'reference/why-is-asynchronous-important.md'
Expand Down
64 changes: 64 additions & 0 deletions documentation/src/test/java/guides/BranchingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package guides;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.junit.jupiter.api.Test;

import java.util.Random;

public class BranchingTest {

@Test
public void uni() {
// <pipeline>
Random random = new Random();
Uni.createFrom().item(() -> random.nextInt(100))
.onItem().transformToUni(n -> {
if (n % 2 == 0) {
return evenOperation(n);
} else {
return oddOperation(n);
}
})
.subscribe().with(System.out::println);
// </pipeline>
}

// <branches>
Uni<String> evenOperation(int n) {
return Uni.createFrom().item("Even number: " + n)
.onItem().invoke(() -> System.out.println("(even branch)"));
}

Uni<String> oddOperation(int n) {
return Uni.createFrom().item("Odd number: " + n)
.onItem().invoke(() -> System.out.println("(odd branch)"));
}
// </branches>

@Test
public void multi() {
// <multi-pipeline>
Random random = new Random();
Multi.createBy().repeating().supplier(random::nextInt).atMost(20)
.onItem().transformToUniAndMerge(n -> {
System.out.println("----");
if (n < 0) {
return drop();
} else if (n % 2 == 0) {
return evenOperation(n);
} else {
return oddOperation(n);
}
})
.subscribe().with(str -> System.out.println("=> " + str));
// </multi-pipeline>
}

// <drop>
Uni<String> drop() {
return Uni.createFrom().<String>nullItem()
.onItem().invoke(() -> System.out.println("(dropping negative value)"));
}
// </drop>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.4.0
package _03_composition_transformation;

import io.smallrye.mutiny.Uni;

import java.util.Random;

public class _20_Uni_Branching {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Uni and branching");

Random random = new Random();
Uni.createFrom().item(() -> random.nextInt(100))
.onItem().transformToUni(n -> {
if (n % 2 == 0) {
return evenOperation(n);
} else {
return oddOperation(n);
}
})
.invoke(str -> {
if (str.startsWith("Odd")) {
System.out.println("(looks like we have a odd number)");
}
})
.subscribe().with(System.out::println);
}

static Uni<String> evenOperation(int n) {
return Uni.createFrom().item("Even number: " + n)
.onItem().invoke(() -> System.out.println("(even branch)"));
}

static Uni<String> oddOperation(int n) {
return Uni.createFrom().item("Odd number: " + n)
.onItem().invoke(() -> System.out.println("(odd branch)"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.4.0
package _03_composition_transformation;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import java.util.Random;

public class _21_Multi_Branching {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Uni and branching");

Random random = new Random();
Multi.createBy().repeating().supplier(random::nextInt).atMost(20)
.onItem().transformToUniAndMerge(n -> {
System.out.println("----");
if (n < 0) {
return drop();
} else if (n % 2 == 0) {
return evenOperation(n);
} else {
return oddOperation(n);
}
})
.subscribe().with(str -> System.out.println("=> " + str));
}

static Uni<String> drop() {
return Uni.createFrom().<String>nullItem()
.onItem().invoke(() -> System.out.println("(dropping negative value)"));
}

static Uni<String> evenOperation(int n) {
return Uni.createFrom().item("Even number: " + n)
.onItem().invoke(() -> System.out.println("(even branch)"));
}

static Uni<String> oddOperation(int n) {
return Uni.createFrom().item("Odd number: " + n)
.onItem().invoke(() -> System.out.println("(odd branch)"));
}
}

0 comments on commit 4753a01

Please sign in to comment.