diff --git a/.gitignore b/.gitignore
index a1c2a23..2c77a63 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,9 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
+
+.project
+.classpath
+.settings
+/target/
+bin
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..6b3e9ba
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,8 @@
+language: java
+jdk:
+ - openjdk8
+ - openjdk11
+os: linux
+cache:
+ directories:
+ - $HOME/.m2
diff --git a/README.md b/README.md
index ee7a224..2eb0dfb 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,74 @@
# apache-client5-reactive
-Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interfaces.
+
+[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
+[![Build Status](https://travis-ci.org/saasquatch/apache-client5-reactive.svg?branch=master)](https://travis-ci.org/saasquatch/apache-client5-reactive)
+[![](https://jitpack.io/v/saasquatch/apache-client5-reactive.svg)](https://jitpack.io/#saasquatch/apache-client5-reactive)
+
+Thin wrapper around Apache HttpComponents HttpAsyncClient 5.x to expose Reactive Streams interfaces.
+
+## Sample usage
+
+For examples, see package [`com.saasquatch.client5reactive.examples`](https://github.com/saasquatch/apache-client5-reactive/tree/master/src/test/java/com/saasquatch/client5reactive/examples).
+
+## Adding it to your project
+
+### Add the repository
+
+Maven
+
+```xml
+
+
+ jitpack.io
+ https://jitpack.io
+
+
+```
+
+Gradle
+
+```gradle
+repositories {
+ maven { url 'https://jitpack.io' }
+}
+```
+
+### Add the dependency
+
+Maven
+
+```xml
+
+ com.github.saasquatch
+ apache-client5-reactive
+ 0.0.1
+
+```
+
+Gradle
+
+```gradle
+compile 'com.github.saasquatch:apache-client5-reactive:0.0.1'
+```
+
+## License
+
+Unless explicitly stated otherwise all files in this repository are licensed under the Apache License 2.0.
+
+License boilerplate:
+
+```
+Copyright 2020 ReferralSaaSquatch.com Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+```
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..99aa525
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,114 @@
+
+ 4.0.0
+ com.saasquatch
+ apache-client5-reactive
+ 0.0.1-SNAPSHOT
+ jar
+
+ apache-client5-reactive
+ https://github.com/saasquatch/apache-client5-reactive
+ 2020
+
+
+ Apache License 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.html
+ repo
+
+
+
+
+ UTF-8
+ 5.6.0
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.version}
+ test
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.0
+
+
+ org.apache.httpcomponents.core5
+ httpcore5-reactive
+ 5.0
+
+
+ io.reactivex.rxjava3
+ rxjava
+ 3.0.0
+
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.30
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.0
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 3.1.1
+
+ none
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.0.0-M4
+
+
+
+
+
diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java
new file mode 100644
index 0000000..c41b6d5
--- /dev/null
+++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java
@@ -0,0 +1,45 @@
+package com.saasquatch.client5reactive;
+
+import java.util.concurrent.CancellationException;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import io.reactivex.rxjava3.core.MaybeEmitter;
+
+/**
+ * Utilities for {@link FutureCallback}s. Not public.
+ *
+ * @author sli
+ */
+final class FutureCallbacks {
+
+ private FutureCallbacks() {}
+
+ public static FutureCallback maybeEmitter(MaybeEmitter emitter) {
+ return new FutureCallback() {
+
+ @Override
+ public void completed(T result) {
+ if (result == null) {
+ emitter.onComplete();
+ } else {
+ emitter.onSuccess(result);
+ }
+ }
+
+ @Override
+ public void failed(Exception ex) {
+ emitter.onError(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ emitter.onError(cancelledException());
+ }
+
+ };
+ }
+
+ private static Exception cancelledException() {
+ return new CancellationException("Future cancelled");
+ }
+
+}
diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java
new file mode 100644
index 0000000..7806360
--- /dev/null
+++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java
@@ -0,0 +1,133 @@
+package com.saasquatch.client5reactive;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.reactivestreams.Publisher;
+
+/**
+ * Thin wrapper around Apache {@link HttpAsyncClient} to expose
+ * Reactive Streams interfaces.
+ * The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and
+ * {@link CloseableHttpAsyncClient}.
+ *
+ * @author sli
+ * @see HttpReactiveClients
+ */
+public interface HttpReactiveClient {
+
+ /**
+ * Execute the given request. This method is equivalent to
+ * {@link HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext , FutureCallback)}.
+ * If the {@link Future} produced by the equivalent {@link HttpAsyncClient} method completes with
+ * {@code null}, then the returning {@link Publisher} of this method will complete with no
+ * element.
+ */
+ Publisher execute(@Nonnull AsyncRequestProducer requestProducer,
+ @Nonnull AsyncResponseConsumer responseConsumer,
+ @Nullable HandlerFactory pushHandlerFactory,
+ @Nullable HttpContext context);
+
+ /**
+ * Convenience method for
+ * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
+ * equivalent to
+ * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, FutureCallback)}
+ */
+ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer,
+ @Nonnull AsyncResponseConsumer responseConsumer, @Nullable HttpContext context) {
+ return execute(requestProducer, responseConsumer, null, context);
+ }
+
+ /**
+ * Convenience method for
+ * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
+ * equivalent to
+ * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, FutureCallback)}.
+ */
+ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer,
+ @Nonnull AsyncResponseConsumer responseConsumer) {
+ return execute(requestProducer, responseConsumer, null);
+ }
+
+ /**
+ * Execute a simple in-memory request and get a simple in-memory response. This method is
+ * equivalent to
+ * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The
+ * returning {@link Publisher} completes with exactly 1 element.
+ */
+ default Publisher execute(@Nonnull SimpleHttpRequest request,
+ @Nullable HttpContext context) {
+ return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), context);
+ }
+
+ /**
+ * Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to
+ * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}.
+ */
+ default Publisher execute(@Nonnull SimpleHttpRequest request) {
+ return execute(request, null);
+ }
+
+ /**
+ * Execute the given request and get a streaming response body as a {@link Publisher} of
+ * {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The
+ * {@link Publisher} within the returning {@link Publisher} may contain 0 to n elements.
+ */
+ Publisher>> streamingExecute(
+ @Nonnull AsyncRequestProducer requestProducer,
+ @Nullable HandlerFactory pushHandlerFactory,
+ @Nullable HttpContext context);
+
+ /**
+ * Convenience method for
+ * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
+ */
+ default Publisher>> streamingExecute(
+ @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) {
+ return streamingExecute(requestProducer, null, context);
+ }
+
+ /**
+ * Convenience method for
+ * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
+ */
+ default Publisher>> streamingExecute(
+ @Nonnull AsyncRequestProducer requestProducer) {
+ return streamingExecute(requestProducer, null);
+ }
+
+ /**
+ * Execute a simple in-memory request and get a streaming response. Convenience method for
+ * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
+ */
+ default Publisher>> streamingExecute(
+ @Nonnull SimpleHttpRequest request, @Nullable HttpContext context) {
+ return streamingExecute(SimpleRequestProducer.create(request), context);
+ }
+
+ /**
+ * Convenience method for
+ * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
+ */
+ default Publisher>> streamingExecute(
+ @Nonnull SimpleHttpRequest request) {
+ return streamingExecute(request, null);
+ }
+
+}
diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java
new file mode 100644
index 0000000..b824b2c
--- /dev/null
+++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java
@@ -0,0 +1,78 @@
+package com.saasquatch.client5reactive;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncPushConsumer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.reactivestreams.Publisher;
+import io.reactivex.rxjava3.core.Maybe;
+
+/**
+ * Concrete implementation of {@link HttpReactiveClient}.
+ *
+ * @author sli
+ */
+final class HttpReactiveClientImpl implements HttpReactiveClient {
+
+ private final HttpAsyncClient httpAsyncClient;
+
+ HttpReactiveClientImpl(HttpAsyncClient httpAsyncClient) {
+ this.httpAsyncClient = httpAsyncClient;
+ }
+
+ @Override
+ public Publisher execute(AsyncRequestProducer requestProducer,
+ AsyncResponseConsumer responseConsumer,
+ HandlerFactory pushHandlerFactory, HttpContext context) {
+ Objects.requireNonNull(requestProducer);
+ Objects.requireNonNull(responseConsumer);
+ return Maybe.create(emitter -> {
+ httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory,
+ defaultHttpContext(context), FutureCallbacks.maybeEmitter(emitter));
+ }).toFlowable();
+ }
+
+ @Override
+ public Publisher>> streamingExecute(
+ AsyncRequestProducer requestProducer, HandlerFactory pushHandlerFactory,
+ HttpContext context) {
+ Objects.requireNonNull(requestProducer);
+ /*
+ * Semantically this should be a Single instead of a Maybe, but using Single here requires an
+ * additional implementation of FutureCallback, and since we are returning a Publisher, it
+ * doesn't really make a difference.
+ */
+ return Maybe.>>create(emitter -> {
+ final ReactiveResponseConsumer responseConsumer =
+ new ReactiveResponseConsumer(FutureCallbacks.maybeEmitter(emitter));
+ httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory,
+ defaultHttpContext(context), null);
+ }).toFlowable();
+ }
+
+ /**
+ * Create a default {@link HttpContext} if the given one is null. In theory this method should not
+ * be needed, since according to the JavaDoc in {@link HttpAsyncClient}, {@link HttpContext} can
+ * be null. However, in Apache HttpClient version 5.0 there's a bug where some implementations of
+ * {@link HttpAsyncClient} like {@link MinimalHttpAsyncClient} reject null {@link HttpContext}. An
+ * issue has already been filed
+ * here. Once that's fixed,
+ * this method can be removed.
+ */
+ @Nonnull
+ private static HttpContext defaultHttpContext(@Nullable HttpContext context) {
+ return context == null ? HttpClientContext.create() : context;
+ }
+
+}
diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java
new file mode 100644
index 0000000..7581db9
--- /dev/null
+++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java
@@ -0,0 +1,32 @@
+package com.saasquatch.client5reactive;
+
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+
+/**
+ * Factory methods for {@link HttpReactiveClient}.
+ *
+ * @author sli
+ * @see HttpAsyncClients
+ */
+public final class HttpReactiveClients {
+
+ private HttpReactiveClients() {}
+
+ /**
+ * Create a {@link HttpReactiveClient} from a given {@link HttpAsyncClient}. Note that the created
+ * {@link HttpReactiveClient} is simply a wrapper of the {@link HttpAsyncClient} and does not
+ * support state management, so you'll need to manage the state of the given
+ * {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()},
+ * {@link CloseableHttpAsyncClient#close()}, etc.
+ */
+ @Nonnull
+ public static HttpReactiveClient create(@Nonnull HttpAsyncClient httpAsyncClient) {
+ Objects.requireNonNull(httpAsyncClient);
+ return new HttpReactiveClientImpl(httpAsyncClient);
+ }
+
+}
diff --git a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java
new file mode 100644
index 0000000..f442249
--- /dev/null
+++ b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java
@@ -0,0 +1,31 @@
+package com.saasquatch.client5reactive;
+
+import java.io.IOException;
+import java.util.concurrent.CancellationException;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.junit.jupiter.api.Test;
+import io.reactivex.rxjava3.core.Maybe;
+
+public class FutureCallbacksTests {
+
+ @Test
+ public void testMaybeEmitter() {
+ Maybe.create(emitter -> {
+ final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter));
+ future.completed(1);
+ }).test().assertResult(1);
+ Maybe.create(emitter -> {
+ final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter));
+ future.completed(null);
+ }).test().assertComplete();
+ Maybe.create(emitter -> {
+ final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter));
+ future.failed(new IOException());
+ }).test().assertError(IOException.class);
+ Maybe.create(emitter -> {
+ final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter));
+ future.cancel();
+ }).test().assertError(CancellationException.class);
+ }
+
+}
diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java
new file mode 100644
index 0000000..5911df3
--- /dev/null
+++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java
@@ -0,0 +1,35 @@
+package com.saasquatch.client5reactive;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.junit.jupiter.api.Test;
+import io.reactivex.rxjava3.core.Flowable;
+
+public class LifecycleTests {
+
+ private static final String EXAMPLE_URL = "https://www.example.com";
+
+ @Test
+ public void testNonStarted() throws Exception {
+ try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) {
+ // Not started
+ final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
+ Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test()
+ .assertError(IllegalStateException.class);
+ }
+ }
+
+ @Test
+ public void testClosed() throws Exception {
+ final HttpReactiveClient reactiveClient;
+ try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) {
+ asyncClient.start();
+ reactiveClient = HttpReactiveClients.create(asyncClient);
+ }
+ // Closed
+ Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test()
+ .assertError(IllegalStateException.class);
+ }
+
+}
diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java
new file mode 100644
index 0000000..f61a40d
--- /dev/null
+++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java
@@ -0,0 +1,67 @@
+package com.saasquatch.client5reactive;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+import io.reactivex.rxjava3.core.Flowable;
+
+public class NullabilityTests {
+
+ private static CloseableHttpAsyncClient asyncClient;
+ private static HttpReactiveClient reactiveClient;
+
+ @BeforeAll
+ public static void beforeAll() {
+ asyncClient = HttpAsyncClients.createDefault();
+ asyncClient.start();
+ reactiveClient = HttpReactiveClients.create(asyncClient);
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ asyncClient.close();
+ }
+
+ @Test
+ public void testNullability() {
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.execute((AsyncRequestProducer) null, null, null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.execute((AsyncRequestProducer) null, null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.execute((AsyncRequestProducer) null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.execute((SimpleHttpRequest) null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.execute((SimpleHttpRequest) null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.streamingExecute((AsyncRequestProducer) null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.streamingExecute((SimpleHttpRequest) null, null));
+ assertThrows(NullPointerException.class,
+ () -> reactiveClient.streamingExecute((SimpleHttpRequest) null));
+ }
+
+ @Test
+ public void testVoidPublisher() {
+ final Publisher voidResultPublisher = reactiveClient.execute(
+ SimpleRequestProducer.create(SimpleHttpRequests.get("https://example.com")),
+ new ReactiveResponseConsumer());
+ assertDoesNotThrow(() -> Flowable.fromPublisher(voidResultPublisher).blockingSubscribe());
+ }
+
+}
diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java
new file mode 100644
index 0000000..59ac154
--- /dev/null
+++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java
@@ -0,0 +1,103 @@
+package com.saasquatch.client5reactive;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.protocol.BasicHttpContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+import io.reactivex.rxjava3.core.Flowable;
+
+public class StreamingTests {
+
+ private static final String FLOWABLE_SOURCE_URL =
+ "https://cdn.jsdelivr.net/gh/ReactiveX/RxJava@81f0569a8b9b7d27059f127b90fd7335118b2ee4/src/main/java/io/reactivex/rxjava3/core/Flowable.java";
+ private static CloseableHttpAsyncClient asyncClient;
+ private static HttpReactiveClient reactiveClient;
+ private static byte[] flowableSourceBytes;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ asyncClient = HttpAsyncClients.createDefault();
+ asyncClient.start();
+ reactiveClient = HttpReactiveClients.create(asyncClient);
+ flowableSourceBytes =
+ asyncClient.execute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null).get().getBodyBytes();
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ asyncClient.close();
+ }
+
+ @Test
+ public void testVanillaExecuteWorks() {
+ final byte[] bodyBytes = Flowable.fromPublisher(reactiveClient.execute(
+ SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)),
+ SimpleResponseConsumer.create())).blockingSingle().getBodyBytes();
+ assertArrayEquals(flowableSourceBytes, bodyBytes);
+ }
+
+ @Test
+ public void testVanillaStreamingWorks() {
+ final byte[] bodyBytes = Flowable
+ .fromPublisher(reactiveClient.streamingExecute(
+ SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL))))
+ .concatMap(Message::getBody).to(this::toByteArray);
+ assertArrayEquals(flowableSourceBytes, bodyBytes);
+ }
+
+ @Test
+ public void testBasicStreamingWorks() {
+ final byte[] bodyBytes = Flowable
+ .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)))
+ .concatMap(Message::getBody).to(this::toByteArray);
+ assertArrayEquals(flowableSourceBytes, bodyBytes);
+ }
+
+ @Test
+ public void testWithMinimalClient() throws Exception {
+ try (MinimalHttpAsyncClient asyncClient = HttpAsyncClients.createMinimal()) {
+ asyncClient.start();
+ final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
+ {
+ final byte[] bodyBytes = Flowable
+ .fromPublisher(
+ reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null))
+ .concatMap(Message::getBody).to(this::toByteArray);
+ assertArrayEquals(flowableSourceBytes, bodyBytes);
+ }
+ {
+ final byte[] bodyBytes = Flowable
+ .fromPublisher(reactiveClient.streamingExecute(
+ SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), new BasicHttpContext()))
+ .concatMap(Message::getBody).to(this::toByteArray);
+ assertArrayEquals(flowableSourceBytes, bodyBytes);
+ }
+ }
+ }
+
+ private byte[] toByteArray(Publisher pub) {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ WritableByteChannel channel = Channels.newChannel(out);) {
+ Flowable.fromPublisher(pub).blockingForEach(channel::write);
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git a/src/test/java/com/saasquatch/client5reactive/examples/Example.java b/src/test/java/com/saasquatch/client5reactive/examples/Example.java
new file mode 100644
index 0000000..10dae99
--- /dev/null
+++ b/src/test/java/com/saasquatch/client5reactive/examples/Example.java
@@ -0,0 +1,45 @@
+package com.saasquatch.client5reactive.examples;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.nio.ByteBuffer;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import com.saasquatch.client5reactive.HttpReactiveClient;
+import com.saasquatch.client5reactive.HttpReactiveClients;
+import io.reactivex.rxjava3.core.Single;
+
+public class Example {
+
+ private static final String EXAMPLE_URL = "https://www.example.com";
+
+ public static void main(String[] args) throws Exception {
+ try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) {
+ asyncClient.start();
+ final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
+ Single.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL)))
+ .doOnSuccess(response -> {
+ System.out.println(response.getCode());
+ System.out.println(response.getBodyText());
+ })
+ .blockingSubscribe();
+ System.out.println("----------");
+ Single.fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(EXAMPLE_URL)))
+ .flatMapPublisher(message -> {
+ System.out.println(message.getHead().getCode());
+ return message.getBody();
+ })
+ .toList()
+ .map(byteBuffers -> {
+ final int totalLength = byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum();
+ final ByteBuffer combined = ByteBuffer.allocate(totalLength);
+ byteBuffers.forEach(combined::put);
+ combined.flip();
+ return UTF_8.decode(combined).toString();
+ })
+ .doOnSuccess(System.out::println)
+ .blockingSubscribe();
+ }
+ }
+
+}