Skip to content

Commit

Permalink
Merge pull request #1595 from smallrye/feat/chunk-to-sentence-examples
Browse files Browse the repository at this point in the history
docs: text chunks stream to sentence stream examples
  • Loading branch information
jponge authored May 13, 2024
2 parents 07d54d5 + 8b2f695 commit 9e50899
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.6.0
package _03_composition_transformation;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class _22_Multi_Chunks_To_Sentence_Stream {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Chunks of text to sentence stream");

List<String> chunks = List.of(
"Hel",
"lo ",
"world\n",
"Foo",
" B",
"ar ",
"Baz\n");

StringBuilder builder = new StringBuilder();
Multi.createFrom().iterable(chunks)
.onItem().transformToUniAndConcatenate(chunk -> {
builder.append(chunk);
String current = builder.toString();
if (current.endsWith("\n")) {
builder.setLength(0);
return Uni.createFrom().item(current.substring(0, current.length() - 1));
} else {
return Uni.createFrom().nullItem();
}
})
.onItem().transformToUniAndConcatenate(line -> sendText(line))
.subscribe().with(
line -> System.out.println(">>> " + line),
Throwable::printStackTrace,
pool::shutdownNow);

}

static final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

static Uni<String> sendText(String text) {
return Uni.createFrom().item(text)
.onItem().delayIt().onExecutor(pool).by(Duration.ofMillis(300))
.onItem().invoke(txt -> System.out.println("[sendText] " + txt));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:2.6.0
package _03_composition_transformation;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;

public class _22_Multi_Chunks_To_Sentence_Stream_Custom_Operator {

public static void main(String[] args) throws InterruptedException {
System.out.println("⚡️ Chunks of text to sentence stream (with a custom operator)");

List<String> chunks = List.of(
"Hel",
"lo ",
"world\n",
"Foo",
" B",
"ar ",
"Baz\n");

Multi.createFrom().iterable(chunks)
.plug(TokenToSentence::new)
.onItem().transformToUniAndConcatenate(line -> sendText(line))
.subscribe().with(
line -> System.out.println(">>> " + line),
Throwable::printStackTrace,
pool::shutdownNow);

}

static class TokenToSentence extends AbstractMultiOperator<String, String> {

public TokenToSentence(Multi<? extends String> upstream) {
super(upstream);
}

@Override
public void subscribe(MultiSubscriber<? super String> downstream) {
upstream.subscribe().withSubscriber(new TokenToSentenceProcessor(downstream));
}

static private class TokenToSentenceProcessor extends MultiOperatorProcessor<String, String> {

private final StringBuilder builder = new StringBuilder();

public TokenToSentenceProcessor(MultiSubscriber<? super String> downstream) {
super(downstream);
}

@Override
public void onItem(String chunk) {
builder.append(chunk);
String current = builder.toString();
if (current.endsWith("\n")) {
builder.setLength(0);
super.onItem(current.substring(0, current.length() - 1));
} else {
upstream.request(1L);
}
}
}
}

static final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

static Uni<String> sendText(String text) {
return Uni.createFrom().item(text)
.onItem().delayIt().onExecutor(pool).by(Duration.ofMillis(300))
.onItem().invoke(txt -> System.out.println("[sendText] " + txt));
}
}

0 comments on commit 9e50899

Please sign in to comment.