From 77bfb9ae5ce59c488e23bb1e481d24be45cbb6cd Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 13:24:26 -0800 Subject: [PATCH 01/59] initial commit --- .gitignore | 4 + pom.xml | 122 ++++++++++++++++++ .../client5reactive/FutureCallbacks.java | 54 ++++++++ .../ReactiveHttpAsyncClient.java | 70 ++++++++++ .../ReactiveHttpAsyncClientImpl.java | 45 +++++++ .../ReactiveHttpAsyncClients.java | 13 ++ 6 files changed, 308 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java create mode 100644 src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java create mode 100644 src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java create mode 100644 src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java diff --git a/.gitignore b/.gitignore index a1c2a23..576e34e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,7 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +.project +.classpath +.settings diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..0c244d0 --- /dev/null +++ b/pom.xml @@ -0,0 +1,122 @@ + + 4.0.0 + com.saasquatch + apache-client5-reactive + 0.0.1-SNAPSHOT + jar + + REPLACEME + REPLACEME + 2020 + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + UTF-8 + 5.6.0 + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + org.apache.httpcomponents.client5 + httpclient5 + 5.0 + + + org.apache.httpcomponents.core5 + httpcore5-reactive + 5.0 + + + io.reactivex.rxjava3 + rxjava + 3.0.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.0 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.1.1 + + none + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M4 + + + org.jacoco + jacoco-maven-plugin + 0.8.5 + + + + prepare-agent + + + + report + test + + report + + + + + + + + diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java new file mode 100644 index 0000000..81559d8 --- /dev/null +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -0,0 +1,54 @@ +package com.saasquatch.client5reactive; + +import java.util.NoSuchElementException; +import org.apache.hc.core5.concurrent.FutureCallback; +import io.reactivex.rxjava3.core.MaybeEmitter; +import io.reactivex.rxjava3.core.SingleEmitter; + +final class FutureCallbacks { + + FutureCallbacks() {} + + public static FutureCallback singleEmitter(SingleEmitter emitter) { + return new FutureCallback() { + @Override + public void completed(T result) { + emitter.onSuccess(result); + } + + @Override + public void failed(Exception ex) { + emitter.onError(ex); + } + + @Override + public void cancelled() { + emitter.onError(new NoSuchElementException()); + } + }; + } + + public static FutureCallback maybeEmitter(MaybeEmitter emitter) { + return new FutureCallback() { + @Override + public void completed(T result) { + if (result == null) { + emitter.onComplete(); + } else { + emitter.onSuccess(result); + } + } + + @Override + public void failed(Exception ex) { + emitter.onError(ex); + } + + @Override + public void cancelled() { + emitter.onError(new NoSuchElementException()); + } + }; + } + +} diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java new file mode 100644 index 0000000..d8da597 --- /dev/null +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -0,0 +1,70 @@ +package com.saasquatch.client5reactive; + +import java.nio.ByteBuffer; +import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.reactivestreams.Publisher; + +/** + * Thin wrapper around Apache {@link HttpAsyncClient} to expose Reactive Streams interfaces.
+ * The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and + * {@link CloseableHttpAsyncClient}. + * + * @author sli + */ +public interface ReactiveHttpAsyncClient { + + Publisher execute(AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer, + HandlerFactory pushHandlerFactory, HttpContext context); + + default Publisher execute(AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer, HttpContext context) { + return execute(requestProducer, responseConsumer, null, context); + } + + default Publisher execute(AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer) { + return execute(requestProducer, responseConsumer, null, HttpClientContext.create()); + } + + default Publisher execute(SimpleHttpRequest request, HttpContext context) { + return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), null, + context); + } + + default Publisher execute(SimpleHttpRequest request) { + return execute(request, HttpClientContext.create()); + } + + Publisher>> streamingExecute( + AsyncRequestProducer requestProducer, HttpContext context); + + default Publisher>> streamingExecute( + AsyncRequestProducer requestProducer) { + return streamingExecute(requestProducer, HttpClientContext.create()); + } + + default Publisher>> streamingExecute( + SimpleHttpRequest request, HttpContext context) { + return streamingExecute(SimpleRequestProducer.create(request), context); + } + + default Publisher>> streamingExecute( + SimpleHttpRequest request) { + return streamingExecute(request, HttpClientContext.create()); + } + +} diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java new file mode 100644 index 0000000..100d2fc --- /dev/null +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -0,0 +1,45 @@ +package com.saasquatch.client5reactive; + +import java.nio.ByteBuffer; +import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.reactivestreams.Publisher; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; + +final class ReactiveHttpAsyncClientImpl implements ReactiveHttpAsyncClient { + + private final HttpAsyncClient httpAsyncClient; + + ReactiveHttpAsyncClientImpl(HttpAsyncClient httpAsyncClient) { + this.httpAsyncClient = httpAsyncClient; + } + + @Override + public Publisher execute(AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer, + HandlerFactory pushHandlerFactory, HttpContext context) { + return Maybe.create(emitter -> { + httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, + FutureCallbacks.maybeEmitter(emitter)); + }).toFlowable(); + } + + @Override + public Publisher>> streamingExecute( + AsyncRequestProducer requestProducer, HttpContext context) { + return Single.>>create(emitter -> { + final ReactiveResponseConsumer responseConsumer = + new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); + httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); + }).toFlowable(); + } + +} diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java new file mode 100644 index 0000000..4f0d950 --- /dev/null +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java @@ -0,0 +1,13 @@ +package com.saasquatch.client5reactive; + +import java.util.Objects; +import org.apache.hc.client5.http.async.HttpAsyncClient; + +public class ReactiveHttpAsyncClients { + + public static ReactiveHttpAsyncClient create(HttpAsyncClient httpAsyncClient) { + Objects.requireNonNull(httpAsyncClient); + return new ReactiveHttpAsyncClientImpl(httpAsyncClient); + } + +} From 746170ff9d5da0f3ffd907e1f638ed39a155023c Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 13:25:00 -0800 Subject: [PATCH 02/59] Update .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 576e34e..fcb1dab 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ hs_err_pid* .project .classpath .settings +/target/ From 2e936434f85bfcf467fe66bc1579f743bac65cd1 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 13:43:26 -0800 Subject: [PATCH 03/59] javadoc --- .../client5reactive/FutureCallbacks.java | 5 ++++ .../ReactiveHttpAsyncClient.java | 26 +++++++++++++++++-- .../ReactiveHttpAsyncClientImpl.java | 5 ++++ .../ReactiveHttpAsyncClients.java | 17 +++++++++++- 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index 81559d8..a6601b7 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -5,6 +5,11 @@ import io.reactivex.rxjava3.core.MaybeEmitter; import io.reactivex.rxjava3.core.SingleEmitter; +/** + * Utilities for {@link FutureCallback}s. Not public. + * + * @author sli + */ final class FutureCallbacks { FutureCallbacks() {} diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index d8da597..33de211 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -8,6 +8,7 @@ import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.nio.AsyncPushConsumer; @@ -23,32 +24,53 @@ * {@link CloseableHttpAsyncClient}. * * @author sli + * @see ReactiveHttpAsyncClients */ public interface ReactiveHttpAsyncClient { + /** + * @see HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, + * HttpContext, FutureCallback) + */ Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer, HandlerFactory pushHandlerFactory, HttpContext context); + /** + * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, + * FutureCallback) + */ default Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer, HttpContext context) { return execute(requestProducer, responseConsumer, null, context); } + /** + * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, + * FutureCallback) + */ default Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer) { return execute(requestProducer, responseConsumer, null, HttpClientContext.create()); } + /** + * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback) + */ default Publisher execute(SimpleHttpRequest request, HttpContext context) { - return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), null, - context); + return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), context); } + /** + * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback) + */ default Publisher execute(SimpleHttpRequest request) { return execute(request, HttpClientContext.create()); } + /** + * Execute the given {@link AsyncRequestProducer} and get a streaming response. + */ Publisher>> streamingExecute( AsyncRequestProducer requestProducer, HttpContext context); diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java index 100d2fc..8d3ebfd 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -14,6 +14,11 @@ import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; +/** + * Concrete implementation of {@link ReactiveHttpAsyncClient}. + * + * @author sli + */ final class ReactiveHttpAsyncClientImpl implements ReactiveHttpAsyncClient { private final HttpAsyncClient httpAsyncClient; diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java index 4f0d950..9e88e02 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java @@ -2,9 +2,24 @@ import java.util.Objects; import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; -public class ReactiveHttpAsyncClients { +/** + * Factory methods for {@link ReactiveHttpAsyncClient}. + * + * @author sli + * @see HttpAsyncClients + */ +public final class ReactiveHttpAsyncClients { + private ReactiveHttpAsyncClients() {} + + /** + * Create a {@link ReactiveHttpAsyncClient} from a given {@link HttpAsyncClient}. Note that the + * created {@link ReactiveHttpAsyncClient} is simply a wrapper of the {@link HttpAsyncClient} and + * does not support lifecycle management, so you'll need to manage the lifecycle of the given + * {@link HttpAsyncClient} yourself. + */ public static ReactiveHttpAsyncClient create(HttpAsyncClient httpAsyncClient) { Objects.requireNonNull(httpAsyncClient); return new ReactiveHttpAsyncClientImpl(httpAsyncClient); From 17e0c46d0722621bf1659044157ff1abfc06b868 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 13:59:11 -0800 Subject: [PATCH 04/59] Create TestFutureCallbacks.java --- .../client5reactive/TestFutureCallbacks.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java diff --git a/src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java b/src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java new file mode 100644 index 0000000..ef711bd --- /dev/null +++ b/src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java @@ -0,0 +1,43 @@ +package com.saasquatch.client5reactive; + +import java.util.NoSuchElementException; +import org.apache.hc.core5.concurrent.BasicFuture; +import org.junit.jupiter.api.Test; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; + +public class TestFutureCallbacks { + + @Test + public void testSingle() { + Single.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); + future.completed(1); + }).test().assertResult(1); + Single.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); + future.completed(null); + }).test().assertError(NullPointerException.class); + Single.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); + future.cancel(); + }).test().assertError(NoSuchElementException.class); + } + + @Test + public void testMaybe() { + Maybe.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); + future.completed(1); + }).test().assertResult(1); + Maybe.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); + future.completed(null); + }).test().assertComplete(); + Maybe.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); + future.cancel(); + }).test().assertError(NoSuchElementException.class); + } + +} From e64799dc0ad6f49c67e3e048c6db53557c0db635 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 14:22:09 -0800 Subject: [PATCH 05/59] cleanup + javadoc --- .../client5reactive/ReactiveHttpAsyncClient.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index 33de211..dd7ce1d 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -51,7 +51,7 @@ default Publisher execute(AsyncRequestProducer requestProducer, */ default Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer) { - return execute(requestProducer, responseConsumer, null, HttpClientContext.create()); + return execute(requestProducer, responseConsumer, HttpClientContext.create()); } /** @@ -74,19 +74,28 @@ default Publisher execute(SimpleHttpRequest request) { Publisher>> streamingExecute( AsyncRequestProducer requestProducer, HttpContext context); + /** + * @see #streamingExecute(AsyncRequestProducer, HttpContext) + */ default Publisher>> streamingExecute( AsyncRequestProducer requestProducer) { return streamingExecute(requestProducer, HttpClientContext.create()); } + /** + * @see #streamingExecute(AsyncRequestProducer, HttpContext) + */ default Publisher>> streamingExecute( SimpleHttpRequest request, HttpContext context) { return streamingExecute(SimpleRequestProducer.create(request), context); } + /** + * @see #streamingExecute(AsyncRequestProducer, HttpContext) + */ default Publisher>> streamingExecute( SimpleHttpRequest request) { - return streamingExecute(request, HttpClientContext.create()); + return streamingExecute(SimpleRequestProducer.create(request)); } } From 5240429fbb2f471f88e7736423ac2bcf8e81519e Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 15:37:47 -0800 Subject: [PATCH 06/59] updated pom meta --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 0c244d0..6f65db3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,8 +6,8 @@ 0.0.1-SNAPSHOT jar - REPLACEME - REPLACEME + apache-client5-reactive + https://github.com/saasquatch/apache-client5-reactive 2020 From 1235ab1f1ba236d521b2bccfb7a3de39fca1dd2c Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 27 Feb 2020 15:37:59 -0800 Subject: [PATCH 07/59] cleanup + attempt to cancel future --- .../saasquatch/client5reactive/FutureCallbacks.java | 7 +++++++ .../client5reactive/ReactiveHttpAsyncClient.java | 9 ++++----- .../client5reactive/ReactiveHttpAsyncClientImpl.java | 10 +++++++--- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index a6601b7..59c41e0 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -1,6 +1,7 @@ package com.saasquatch.client5reactive; import java.util.NoSuchElementException; +import java.util.concurrent.Future; import org.apache.hc.core5.concurrent.FutureCallback; import io.reactivex.rxjava3.core.MaybeEmitter; import io.reactivex.rxjava3.core.SingleEmitter; @@ -56,4 +57,10 @@ public void cancelled() { }; } + public static void futureCancellable(Future future) { + if (!future.isDone() && !future.isCancelled()) { + future.cancel(true); + } + } + } diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index dd7ce1d..9a0c8f2 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -7,7 +7,6 @@ import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; @@ -51,7 +50,7 @@ default Publisher execute(AsyncRequestProducer requestProducer, */ default Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer) { - return execute(requestProducer, responseConsumer, HttpClientContext.create()); + return execute(requestProducer, responseConsumer, null); } /** @@ -65,7 +64,7 @@ default Publisher execute(SimpleHttpRequest request, HttpCon * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback) */ default Publisher execute(SimpleHttpRequest request) { - return execute(request, HttpClientContext.create()); + return execute(request, null); } /** @@ -79,7 +78,7 @@ Publisher>> streamingExecute( */ default Publisher>> streamingExecute( AsyncRequestProducer requestProducer) { - return streamingExecute(requestProducer, HttpClientContext.create()); + return streamingExecute(requestProducer, null); } /** @@ -95,7 +94,7 @@ default Publisher>> streamingExecute */ default Publisher>> streamingExecute( SimpleHttpRequest request) { - return streamingExecute(SimpleRequestProducer.create(request)); + return streamingExecute(request, null); } } diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java index 8d3ebfd..c667dd4 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -1,6 +1,7 @@ package com.saasquatch.client5reactive; import java.nio.ByteBuffer; +import java.util.concurrent.Future; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; @@ -32,8 +33,9 @@ public Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer, HandlerFactory pushHandlerFactory, HttpContext context) { return Maybe.create(emitter -> { - httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, - FutureCallbacks.maybeEmitter(emitter)); + final Future resultFuture = httpAsyncClient.execute(requestProducer, responseConsumer, + pushHandlerFactory, context, FutureCallbacks.maybeEmitter(emitter)); + emitter.setCancellable(() -> FutureCallbacks.futureCancellable(resultFuture)); }).toFlowable(); } @@ -43,7 +45,9 @@ public Publisher>> streamingExecute( return Single.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); - httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); + final Future resultFuture = + httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); + emitter.setCancellable(() -> FutureCallbacks.futureCancellable(resultFuture)); }).toFlowable(); } From 204cff75a11f80b479addcbf7e3a569d8d1b1704 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 11:49:56 -0800 Subject: [PATCH 08/59] added null annotations --- pom.xml | 5 ++++ .../ReactiveHttpAsyncClient.java | 30 +++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 6f65db3..7112765 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,11 @@ rxjava 3.0.0 + + com.google.code.findbugs + jsr305 + 3.0.2 + diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index 9a0c8f2..db0e154 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -1,6 +1,8 @@ package com.saasquatch.client5reactive; import java.nio.ByteBuffer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; @@ -31,16 +33,17 @@ public interface ReactiveHttpAsyncClient { * @see HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, * HttpContext, FutureCallback) */ - Publisher execute(AsyncRequestProducer requestProducer, - AsyncResponseConsumer responseConsumer, - HandlerFactory pushHandlerFactory, HttpContext context); + Publisher execute(@Nonnull AsyncRequestProducer requestProducer, + @Nonnull AsyncResponseConsumer responseConsumer, + @Nullable HandlerFactory pushHandlerFactory, + @Nullable HttpContext context); /** * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, * FutureCallback) */ - default Publisher execute(AsyncRequestProducer requestProducer, - AsyncResponseConsumer responseConsumer, HttpContext context) { + default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, + @Nonnull AsyncResponseConsumer responseConsumer, @Nullable HttpContext context) { return execute(requestProducer, responseConsumer, null, context); } @@ -48,22 +51,23 @@ default Publisher execute(AsyncRequestProducer requestProducer, * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, * FutureCallback) */ - default Publisher execute(AsyncRequestProducer requestProducer, - AsyncResponseConsumer responseConsumer) { + default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, + @Nonnull AsyncResponseConsumer responseConsumer) { return execute(requestProducer, responseConsumer, null); } /** * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback) */ - default Publisher execute(SimpleHttpRequest request, HttpContext context) { + default Publisher execute(@Nonnull SimpleHttpRequest request, + @Nullable HttpContext context) { return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), context); } /** * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback) */ - default Publisher execute(SimpleHttpRequest request) { + default Publisher execute(@Nonnull SimpleHttpRequest request) { return execute(request, null); } @@ -71,13 +75,13 @@ default Publisher execute(SimpleHttpRequest request) { * Execute the given {@link AsyncRequestProducer} and get a streaming response. */ Publisher>> streamingExecute( - AsyncRequestProducer requestProducer, HttpContext context); + @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context); /** * @see #streamingExecute(AsyncRequestProducer, HttpContext) */ default Publisher>> streamingExecute( - AsyncRequestProducer requestProducer) { + @Nonnull AsyncRequestProducer requestProducer) { return streamingExecute(requestProducer, null); } @@ -85,7 +89,7 @@ default Publisher>> streamingExecute * @see #streamingExecute(AsyncRequestProducer, HttpContext) */ default Publisher>> streamingExecute( - SimpleHttpRequest request, HttpContext context) { + @Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { return streamingExecute(SimpleRequestProducer.create(request), context); } @@ -93,7 +97,7 @@ default Publisher>> streamingExecute * @see #streamingExecute(AsyncRequestProducer, HttpContext) */ default Publisher>> streamingExecute( - SimpleHttpRequest request) { + @Nonnull SimpleHttpRequest request) { return streamingExecute(request, null); } From 061764ac9021bdae45f9f596f30b223e778eab15 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 12:11:20 -0800 Subject: [PATCH 09/59] assemble time null checks --- .../client5reactive/ReactiveHttpAsyncClientImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java index c667dd4..fe83afb 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -1,6 +1,7 @@ package com.saasquatch.client5reactive; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.Future; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.core5.http.HttpResponse; @@ -32,6 +33,8 @@ final class ReactiveHttpAsyncClientImpl implements ReactiveHttpAsyncClient { public Publisher execute(AsyncRequestProducer requestProducer, AsyncResponseConsumer responseConsumer, HandlerFactory pushHandlerFactory, HttpContext context) { + Objects.requireNonNull(requestProducer); + Objects.requireNonNull(responseConsumer); return Maybe.create(emitter -> { final Future resultFuture = httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, FutureCallbacks.maybeEmitter(emitter)); @@ -42,6 +45,7 @@ public Publisher execute(AsyncRequestProducer requestProducer, @Override public Publisher>> streamingExecute( AsyncRequestProducer requestProducer, HttpContext context) { + Objects.requireNonNull(requestProducer); return Single.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); From bf413f6c176f9b0f6c5d99d7ef83282d89a8a844 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 12:11:42 -0800 Subject: [PATCH 10/59] nullability unit tests --- .../client5reactive/NullabilityTests.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 src/test/java/com/saasquatch/client5reactive/NullabilityTests.java diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java new file mode 100644 index 0000000..25fb15a --- /dev/null +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -0,0 +1,47 @@ +package com.saasquatch.client5reactive; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class NullabilityTests { + + private static CloseableHttpAsyncClient asyncClient; + private static ReactiveHttpAsyncClient reactiveClient; + + @BeforeAll + public static void beforeAll() { + asyncClient = HttpAsyncClients.createDefault(); + asyncClient.start(); + reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + } + + @AfterAll + public static void afterAll() throws Exception { + asyncClient.close(); + } + + @Test + public void testNullability() { + assertThrows(NullPointerException.class, + () -> reactiveClient.execute((AsyncRequestProducer) null, null, null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.execute((AsyncRequestProducer) null, null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.execute((AsyncRequestProducer) null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.execute((SimpleHttpRequest) null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.execute((SimpleHttpRequest) null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((AsyncRequestProducer) null)); + } + +} From 9df6ac0eb9d114e10ddc9cb89a10c5c8c796b3d6 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:18:51 -0800 Subject: [PATCH 11/59] SL: more tests --- .../client5reactive/NullabilityTests.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index 25fb15a..5bc55f4 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -2,12 +2,17 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import io.reactivex.rxjava3.core.Flowable; public class NullabilityTests { @@ -44,4 +49,12 @@ public void testNullability() { () -> reactiveClient.streamingExecute((AsyncRequestProducer) null)); } + @Test + public void testEmptyPublisher() { + final Publisher voidResultPublisher = reactiveClient.execute( + SimpleRequestProducer.create(SimpleHttpRequests.get("https://example.com")), + new ReactiveResponseConsumer()); + Flowable.fromPublisher(voidResultPublisher).test().assertEmpty(); + } + } From 2e272850b1073539351e8863529b2421eade8c8e Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:21:00 -0800 Subject: [PATCH 12/59] SL: rename test file --- .../{TestFutureCallbacks.java => FutureCallbacksTests.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename src/test/java/com/saasquatch/client5reactive/{TestFutureCallbacks.java => FutureCallbacksTests.java} (97%) diff --git a/src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java similarity index 97% rename from src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java rename to src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java index ef711bd..28a7915 100644 --- a/src/test/java/com/saasquatch/client5reactive/TestFutureCallbacks.java +++ b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java @@ -6,7 +6,7 @@ import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; -public class TestFutureCallbacks { +public class FutureCallbacksTests { @Test public void testSingle() { From 1bf9a537dc9a019d8f34b80c51bcbbf404e5fa22 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:24:12 -0800 Subject: [PATCH 13/59] Create LifecycleTests.java --- .../client5reactive/LifecycleTests.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 src/test/java/com/saasquatch/client5reactive/LifecycleTests.java diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java new file mode 100644 index 0000000..4dbbb92 --- /dev/null +++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java @@ -0,0 +1,33 @@ +package com.saasquatch.client5reactive; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.junit.jupiter.api.Test; +import io.reactivex.rxjava3.core.Flowable; + +public class LifecycleTests { + + @Test + public void testNonStarted() throws Exception { + try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { + // Not started + final ReactiveHttpAsyncClient reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) + .test().assertError(IllegalStateException.class); + } + } + + @Test + public void testClosed() throws Exception { + final ReactiveHttpAsyncClient reactiveClient; + try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { + asyncClient.start(); + reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + } + // Closed + Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) + .test().assertError(IllegalStateException.class); + } + +} From 350ca8913aff7e45fa2ce07144c3bc550673e1e0 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:45:11 -0800 Subject: [PATCH 14/59] Test for streaming + remove cancellation --- pom.xml | 5 ++ .../ReactiveHttpAsyncClientImpl.java | 10 ++-- .../client5reactive/StreamingTests.java | 48 +++++++++++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 src/test/java/com/saasquatch/client5reactive/StreamingTests.java diff --git a/pom.xml b/pom.xml index 7112765..360c50f 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,11 @@ jsr305 3.0.2 + + com.google.guava + guava + 28.2-jre + diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java index fe83afb..1a2c401 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -2,7 +2,6 @@ import java.nio.ByteBuffer; import java.util.Objects; -import java.util.concurrent.Future; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; @@ -36,9 +35,8 @@ public Publisher execute(AsyncRequestProducer requestProducer, Objects.requireNonNull(requestProducer); Objects.requireNonNull(responseConsumer); return Maybe.create(emitter -> { - final Future resultFuture = httpAsyncClient.execute(requestProducer, responseConsumer, - pushHandlerFactory, context, FutureCallbacks.maybeEmitter(emitter)); - emitter.setCancellable(() -> FutureCallbacks.futureCancellable(resultFuture)); + httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, + FutureCallbacks.maybeEmitter(emitter)); }).toFlowable(); } @@ -49,9 +47,7 @@ public Publisher>> streamingExecute( return Single.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); - final Future resultFuture = - httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); - emitter.setCancellable(() -> FutureCallbacks.futureCancellable(resultFuture)); + httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); }).toFlowable(); } diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java new file mode 100644 index 0000000..4e354e3 --- /dev/null +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -0,0 +1,48 @@ +package com.saasquatch.client5reactive; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import java.net.URL; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.Message; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Bytes; +import io.reactivex.rxjava3.core.Flowable; + +public class StreamingTests { + + private static final String FLOWABLE_URL = + "https://cdn.jsdelivr.net/gh/ReactiveX/RxJava@81f0569a8b9b7d27059f127b90fd7335118b2ee4/src/main/java/io/reactivex/rxjava3/core/Flowable.java"; + private static CloseableHttpAsyncClient asyncClient; + private static ReactiveHttpAsyncClient reactiveClient; + + @BeforeAll + public static void beforeAll() { + asyncClient = HttpAsyncClients.createDefault(); + asyncClient.start(); + reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + } + + @AfterAll + public static void afterAll() throws Exception { + asyncClient.close(); + } + + @Test + public void testBasicStreamingWorks() throws Exception { + final byte[] bodyBytes1 = ByteStreams.toByteArray(new URL(FLOWABLE_URL).openStream()); + final byte[] bodyBytes2 = Flowable + .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_URL))) + .concatMap(Message::getBody).map(bb -> { + final byte[] arr = new byte[bb.remaining()]; + bb.get(arr); + return arr; + }).reduce(Bytes::concat).blockingGet(); + assertArrayEquals(bodyBytes1, bodyBytes2); + } + +} From 8728bdee057cdd8595b098aba6f0535b88f5a563 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:57:33 -0800 Subject: [PATCH 15/59] test slf4j dependency to silence warning --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index 360c50f..32418f1 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,12 @@ guava 28.2-jre + + org.slf4j + slf4j-simple + 1.7.30 + test + From 96ad6310c27e4c9b034d0903cfef2bc44937c136 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 13:57:41 -0800 Subject: [PATCH 16/59] more steraming tests --- .../client5reactive/StreamingTests.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 4e354e3..209733a 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -3,6 +3,8 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import java.net.URL; import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.core5.http.Message; @@ -19,12 +21,14 @@ public class StreamingTests { "https://cdn.jsdelivr.net/gh/ReactiveX/RxJava@81f0569a8b9b7d27059f127b90fd7335118b2ee4/src/main/java/io/reactivex/rxjava3/core/Flowable.java"; private static CloseableHttpAsyncClient asyncClient; private static ReactiveHttpAsyncClient reactiveClient; + private static byte[] flowableSourceBytes; @BeforeAll - public static void beforeAll() { + public static void beforeAll() throws Exception { asyncClient = HttpAsyncClients.createDefault(); asyncClient.start(); reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + flowableSourceBytes = ByteStreams.toByteArray(new URL(FLOWABLE_URL).openStream()); } @AfterAll @@ -32,17 +36,25 @@ public static void afterAll() throws Exception { asyncClient.close(); } + @Test + public void testVanillaExecuteWorks() throws Exception { + final byte[] bodyBytes = Flowable.fromPublisher( + reactiveClient.execute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)), + SimpleResponseConsumer.create())) + .blockingSingle().getBodyBytes(); + assertArrayEquals(flowableSourceBytes, bodyBytes); + } + @Test public void testBasicStreamingWorks() throws Exception { - final byte[] bodyBytes1 = ByteStreams.toByteArray(new URL(FLOWABLE_URL).openStream()); - final byte[] bodyBytes2 = Flowable + final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_URL))) .concatMap(Message::getBody).map(bb -> { final byte[] arr = new byte[bb.remaining()]; bb.get(arr); return arr; }).reduce(Bytes::concat).blockingGet(); - assertArrayEquals(bodyBytes1, bodyBytes2); + assertArrayEquals(flowableSourceBytes, bodyBytes); } } From c1acbfa0aa0b8f53218472dd5da3719a13eb1dce Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 14:02:21 -0800 Subject: [PATCH 17/59] Create .travis.yml --- .travis.yml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..5ac4518 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,7 @@ +language: java +jdk: + - openjdk8 +os: linux +cache: + directories: + - $HOME/.m2 From 61b45e334d2549488477230739026c6e0741323e Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 14:02:54 -0800 Subject: [PATCH 18/59] Update pom.xml --- pom.xml | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/pom.xml b/pom.xml index 32418f1..aafc926 100644 --- a/pom.xml +++ b/pom.xml @@ -113,25 +113,6 @@ maven-surefire-plugin 3.0.0-M4 - - org.jacoco - jacoco-maven-plugin - 0.8.5 - - - - prepare-agent - - - - report - test - - report - - - - From f88ea8cc85d5a3d21383f5e54a1f8a628b12ec58 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 14:03:38 -0800 Subject: [PATCH 19/59] unused method --- .../com/saasquatch/client5reactive/FutureCallbacks.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index 59c41e0..a6601b7 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -1,7 +1,6 @@ package com.saasquatch.client5reactive; import java.util.NoSuchElementException; -import java.util.concurrent.Future; import org.apache.hc.core5.concurrent.FutureCallback; import io.reactivex.rxjava3.core.MaybeEmitter; import io.reactivex.rxjava3.core.SingleEmitter; @@ -57,10 +56,4 @@ public void cancelled() { }; } - public static void futureCancellable(Future future) { - if (!future.isDone() && !future.isCancelled()) { - future.cancel(true); - } - } - } From 734f0fe8ae5bcb4affe0cd2736f55dce5193d492 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 14:04:36 -0800 Subject: [PATCH 20/59] failed callback test --- .../saasquatch/client5reactive/FutureCallbacksTests.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java index 28a7915..67abf2e 100644 --- a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java +++ b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java @@ -1,5 +1,6 @@ package com.saasquatch.client5reactive; +import java.io.IOException; import java.util.NoSuchElementException; import org.apache.hc.core5.concurrent.BasicFuture; import org.junit.jupiter.api.Test; @@ -18,6 +19,10 @@ public void testSingle() { final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); future.completed(null); }).test().assertError(NullPointerException.class); + Single.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); + future.failed(new IOException()); + }).test().assertError(IOException.class); Single.create(emitter -> { final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); future.cancel(); @@ -34,6 +39,10 @@ public void testMaybe() { final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); future.completed(null); }).test().assertComplete(); + Maybe.create(emitter -> { + final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); + future.failed(new IOException()); + }).test().assertError(IOException.class); Maybe.create(emitter -> { final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); future.cancel(); From 8cc1e16cbf2124dea35fe75e53c4543d9d837115 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 28 Feb 2020 14:08:01 -0800 Subject: [PATCH 21/59] more streaming test --- .../client5reactive/FutureCallbacks.java | 2 +- .../client5reactive/StreamingTests.java | 28 ++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index a6601b7..de99f15 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -12,7 +12,7 @@ */ final class FutureCallbacks { - FutureCallbacks() {} + private FutureCallbacks() {} public static FutureCallback singleEmitter(SingleEmitter emitter) { return new FutureCallback() { diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 209733a..da1dea4 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import java.net.URL; +import java.nio.ByteBuffer; import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; @@ -37,7 +38,7 @@ public static void afterAll() throws Exception { } @Test - public void testVanillaExecuteWorks() throws Exception { + public void testVanillaExecuteWorks() { final byte[] bodyBytes = Flowable.fromPublisher( reactiveClient.execute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)), SimpleResponseConsumer.create())) @@ -46,15 +47,28 @@ public void testVanillaExecuteWorks() throws Exception { } @Test - public void testBasicStreamingWorks() throws Exception { + public void testVanillaStreamingWorks() { + final byte[] bodyBytes = Flowable + .fromPublisher(reactiveClient + .streamingExecute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)))) + .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) + .blockingGet(); + assertArrayEquals(flowableSourceBytes, bodyBytes); + } + + @Test + public void testBasicStreamingWorks() { final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_URL))) - .concatMap(Message::getBody).map(bb -> { - final byte[] arr = new byte[bb.remaining()]; - bb.get(arr); - return arr; - }).reduce(Bytes::concat).blockingGet(); + .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) + .blockingGet(); assertArrayEquals(flowableSourceBytes, bodyBytes); } + private static byte[] byteBufferToArray(ByteBuffer bb) { + final byte[] arr = new byte[bb.remaining()]; + bb.get(arr); + return arr; + } + } From fdb275294834e7c0c7c79eace6dcc7adb89a1a49 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 09:45:29 -0800 Subject: [PATCH 22/59] Delete an overload --- .../client5reactive/ReactiveHttpAsyncClient.java | 8 -------- .../com/saasquatch/client5reactive/NullabilityTests.java | 2 -- .../com/saasquatch/client5reactive/StreamingTests.java | 4 ++-- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index db0e154..d1863fe 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -77,14 +77,6 @@ default Publisher execute(@Nonnull SimpleHttpRequest request Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context); - /** - * @see #streamingExecute(AsyncRequestProducer, HttpContext) - */ - default Publisher>> streamingExecute( - @Nonnull AsyncRequestProducer requestProducer) { - return streamingExecute(requestProducer, null); - } - /** * @see #streamingExecute(AsyncRequestProducer, HttpContext) */ diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index 5bc55f4..436dc90 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -45,8 +45,6 @@ public void testNullability() { () -> reactiveClient.execute((SimpleHttpRequest) null)); assertThrows(NullPointerException.class, () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null)); - assertThrows(NullPointerException.class, - () -> reactiveClient.streamingExecute((AsyncRequestProducer) null)); } @Test diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index da1dea4..9e9e2ce 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -49,8 +49,8 @@ public void testVanillaExecuteWorks() { @Test public void testVanillaStreamingWorks() { final byte[] bodyBytes = Flowable - .fromPublisher(reactiveClient - .streamingExecute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)))) + .fromPublisher(reactiveClient.streamingExecute( + SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)), null)) .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) .blockingGet(); assertArrayEquals(flowableSourceBytes, bodyBytes); From 7d52bece56b09179842cfe656330d33d91ecb854 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 09:52:48 -0800 Subject: [PATCH 23/59] cleanup --- .../ReactiveHttpAsyncClient.java | 19 +++++++++---------- .../ReactiveHttpAsyncClientImpl.java | 5 +++-- .../client5reactive/StreamingTests.java | 3 ++- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index d1863fe..e61dd54 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -72,25 +72,24 @@ default Publisher execute(@Nonnull SimpleHttpRequest request } /** - * Execute the given {@link AsyncRequestProducer} and get a streaming response. + * Execute the given request and get a streaming response. */ Publisher>> streamingExecute( - @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context); + @Nonnull AsyncRequestProducer requestProducer, + @Nullable HandlerFactory pushHandlerFactory, + @Nullable HttpContext context); /** - * @see #streamingExecute(AsyncRequestProducer, HttpContext) + * @see #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext) */ default Publisher>> streamingExecute( - @Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { - return streamingExecute(SimpleRequestProducer.create(request), context); + @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) { + return streamingExecute(requestProducer, null, context); } - /** - * @see #streamingExecute(AsyncRequestProducer, HttpContext) - */ default Publisher>> streamingExecute( - @Nonnull SimpleHttpRequest request) { - return streamingExecute(request, null); + @Nonnull AsyncRequestProducer requestProducer) { + return streamingExecute(requestProducer, null); } } diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java index 1a2c401..f22e067 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java @@ -42,12 +42,13 @@ public Publisher execute(AsyncRequestProducer requestProducer, @Override public Publisher>> streamingExecute( - AsyncRequestProducer requestProducer, HttpContext context) { + AsyncRequestProducer requestProducer, HandlerFactory pushHandlerFactory, + HttpContext context) { Objects.requireNonNull(requestProducer); return Single.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); - httpAsyncClient.execute(requestProducer, responseConsumer, null, context, null); + httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, null); }).toFlowable(); } diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 9e9e2ce..73107a3 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -59,7 +59,8 @@ public void testVanillaStreamingWorks() { @Test public void testBasicStreamingWorks() { final byte[] bodyBytes = Flowable - .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_URL))) + .fromPublisher(reactiveClient + .streamingExecute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)))) .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) .blockingGet(); assertArrayEquals(flowableSourceBytes, bodyBytes); From f8269d1a07a40fccf7d297e8460041e8ebbe8a63 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 10:01:37 -0800 Subject: [PATCH 24/59] brought back some overloads --- .../ReactiveHttpAsyncClient.java | 25 ++++++++++++++++++- .../client5reactive/NullabilityTests.java | 8 ++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index e61dd54..6b34ccc 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -80,16 +80,39 @@ Publisher>> streamingExecute( @Nullable HttpContext context); /** - * @see #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext) + * Convenience method for + * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} */ default Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) { return streamingExecute(requestProducer, null, context); } + /** + * Convenience method for + * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + */ default Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer) { return streamingExecute(requestProducer, null); } + /** + * Convenience method for + * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + */ + default Publisher>> streamingExecute( + @Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { + return streamingExecute(SimpleRequestProducer.create(request), context); + } + + /** + * Convenience method for + * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + */ + default Publisher>> streamingExecute( + @Nonnull SimpleHttpRequest request) { + return streamingExecute(request, null); + } + } diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index 436dc90..84e8130 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -43,8 +43,16 @@ public void testNullability() { () -> reactiveClient.execute((SimpleHttpRequest) null, null)); assertThrows(NullPointerException.class, () -> reactiveClient.execute((SimpleHttpRequest) null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null, null)); assertThrows(NullPointerException.class, () -> reactiveClient.streamingExecute((AsyncRequestProducer) null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((AsyncRequestProducer) null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((SimpleHttpRequest) null, null)); + assertThrows(NullPointerException.class, + () -> reactiveClient.streamingExecute((SimpleHttpRequest) null)); } @Test From 544ddabc68bf5bc620057bdca64e17b0f46f5b57 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 10:03:07 -0800 Subject: [PATCH 25/59] speed up test --- .../com/saasquatch/client5reactive/NullabilityTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index 84e8130..9e2d3da 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -1,5 +1,6 @@ package com.saasquatch.client5reactive; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; @@ -56,11 +57,11 @@ public void testNullability() { } @Test - public void testEmptyPublisher() { + public void testVoidPublisher() { final Publisher voidResultPublisher = reactiveClient.execute( SimpleRequestProducer.create(SimpleHttpRequests.get("https://example.com")), new ReactiveResponseConsumer()); - Flowable.fromPublisher(voidResultPublisher).test().assertEmpty(); + assertDoesNotThrow(() -> Flowable.fromPublisher(voidResultPublisher).blockingSubscribe()); } } From a780bf9280ed52be09f179971443e325d72461e1 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 10:20:25 -0800 Subject: [PATCH 26/59] spacing update --- .../java/com/saasquatch/client5reactive/FutureCallbacks.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index de99f15..fc1e4d5 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -16,6 +16,7 @@ private FutureCallbacks() {} public static FutureCallback singleEmitter(SingleEmitter emitter) { return new FutureCallback() { + @Override public void completed(T result) { emitter.onSuccess(result); @@ -30,11 +31,13 @@ public void failed(Exception ex) { public void cancelled() { emitter.onError(new NoSuchElementException()); } + }; } public static FutureCallback maybeEmitter(MaybeEmitter emitter) { return new FutureCallback() { + @Override public void completed(T result) { if (result == null) { @@ -53,6 +56,7 @@ public void failed(Exception ex) { public void cancelled() { emitter.onError(new NoSuchElementException()); } + }; } From 38ca45ea083ad9f53b5aa110c760e8016f23915a Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 14:49:10 -0800 Subject: [PATCH 27/59] use CancellationException --- .../com/saasquatch/client5reactive/FutureCallbacks.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index fc1e4d5..d6716a1 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -1,6 +1,6 @@ package com.saasquatch.client5reactive; -import java.util.NoSuchElementException; +import java.util.concurrent.CancellationException; import org.apache.hc.core5.concurrent.FutureCallback; import io.reactivex.rxjava3.core.MaybeEmitter; import io.reactivex.rxjava3.core.SingleEmitter; @@ -29,7 +29,7 @@ public void failed(Exception ex) { @Override public void cancelled() { - emitter.onError(new NoSuchElementException()); + emitter.onError(new CancellationException()); } }; @@ -54,7 +54,7 @@ public void failed(Exception ex) { @Override public void cancelled() { - emitter.onError(new NoSuchElementException()); + emitter.onError(new CancellationException()); } }; From 2248dcd7ff32000c433fee33ce794b83c75717cb Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 14:54:50 -0800 Subject: [PATCH 28/59] update test to use CancellationException --- .../saasquatch/client5reactive/FutureCallbacksTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java index 67abf2e..d7e2d83 100644 --- a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java +++ b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java @@ -1,7 +1,7 @@ package com.saasquatch.client5reactive; import java.io.IOException; -import java.util.NoSuchElementException; +import java.util.concurrent.CancellationException; import org.apache.hc.core5.concurrent.BasicFuture; import org.junit.jupiter.api.Test; import io.reactivex.rxjava3.core.Maybe; @@ -26,7 +26,7 @@ public void testSingle() { Single.create(emitter -> { final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); future.cancel(); - }).test().assertError(NoSuchElementException.class); + }).test().assertError(CancellationException.class); } @Test @@ -46,7 +46,7 @@ public void testMaybe() { Maybe.create(emitter -> { final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); future.cancel(); - }).test().assertError(NoSuchElementException.class); + }).test().assertError(CancellationException.class); } } From 589f2ca0ba9488d37b906c845d940d803788aff9 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Mon, 2 Mar 2020 15:14:03 -0800 Subject: [PATCH 29/59] gitignore bin --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index fcb1dab..2c77a63 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ hs_err_pid* .classpath .settings /target/ +bin From 835772dc8f7fed1e68f1b8d183a5dab820500bd8 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Tue, 3 Mar 2020 15:25:23 -0800 Subject: [PATCH 30/59] Nonnull annotation --- .../saasquatch/client5reactive/ReactiveHttpAsyncClients.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java index 9e88e02..1af611a 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java @@ -1,6 +1,7 @@ package com.saasquatch.client5reactive; import java.util.Objects; +import javax.annotation.Nonnull; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; @@ -20,7 +21,7 @@ private ReactiveHttpAsyncClients() {} * does not support lifecycle management, so you'll need to manage the lifecycle of the given * {@link HttpAsyncClient} yourself. */ - public static ReactiveHttpAsyncClient create(HttpAsyncClient httpAsyncClient) { + public static ReactiveHttpAsyncClient create(@Nonnull HttpAsyncClient httpAsyncClient) { Objects.requireNonNull(httpAsyncClient); return new ReactiveHttpAsyncClientImpl(httpAsyncClient); } From 84748581706e219884f7328590a72902645e9014 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Tue, 3 Mar 2020 16:45:04 -0800 Subject: [PATCH 31/59] Update guava scope to test --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index aafc926..85b39d0 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ com.google.guava guava 28.2-jre + test org.slf4j From 3d3c63b98138691706ae49a49f007517ef010674 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Wed, 4 Mar 2020 09:50:54 -0800 Subject: [PATCH 32/59] Nonnull annotation --- .../com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java index 1af611a..dd7811a 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java @@ -21,6 +21,7 @@ private ReactiveHttpAsyncClients() {} * does not support lifecycle management, so you'll need to manage the lifecycle of the given * {@link HttpAsyncClient} yourself. */ + @Nonnull public static ReactiveHttpAsyncClient create(@Nonnull HttpAsyncClient httpAsyncClient) { Objects.requireNonNull(httpAsyncClient); return new ReactiveHttpAsyncClientImpl(httpAsyncClient); From 6cc26a1bb5c9587f11eb2d698fb36b6c5dff2f24 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Wed, 4 Mar 2020 10:26:10 -0800 Subject: [PATCH 33/59] javadoc --- .../client5reactive/ReactiveHttpAsyncClients.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java index dd7811a..129bb98 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java @@ -3,6 +3,7 @@ import java.util.Objects; import javax.annotation.Nonnull; import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; /** @@ -18,8 +19,9 @@ private ReactiveHttpAsyncClients() {} /** * Create a {@link ReactiveHttpAsyncClient} from a given {@link HttpAsyncClient}. Note that the * created {@link ReactiveHttpAsyncClient} is simply a wrapper of the {@link HttpAsyncClient} and - * does not support lifecycle management, so you'll need to manage the lifecycle of the given - * {@link HttpAsyncClient} yourself. + * does not support state management, so you'll need to manage the state of the given + * {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, + * {@link CloseableHttpAsyncClient#close()}, etc. */ @Nonnull public static ReactiveHttpAsyncClient create(@Nonnull HttpAsyncClient httpAsyncClient) { From 2841e14937d97030cffcc04f7e5167b636e41ce4 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Wed, 4 Mar 2020 11:54:41 -0800 Subject: [PATCH 34/59] remove Guava as dependency --- pom.xml | 6 --- .../client5reactive/StreamingTests.java | 45 ++++++++++--------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 85b39d0..99aa525 100644 --- a/pom.xml +++ b/pom.xml @@ -55,12 +55,6 @@ jsr305 3.0.2 - - com.google.guava - guava - 28.2-jre - test - org.slf4j slf4j-simple diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 73107a3..e078df1 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -1,8 +1,12 @@ package com.saasquatch.client5reactive; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import java.net.URL; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; @@ -12,13 +16,12 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import com.google.common.io.ByteStreams; -import com.google.common.primitives.Bytes; +import org.reactivestreams.Publisher; import io.reactivex.rxjava3.core.Flowable; public class StreamingTests { - private static final String FLOWABLE_URL = + private static final String FLOWABLE_SOURCE_URL = "https://cdn.jsdelivr.net/gh/ReactiveX/RxJava@81f0569a8b9b7d27059f127b90fd7335118b2ee4/src/main/java/io/reactivex/rxjava3/core/Flowable.java"; private static CloseableHttpAsyncClient asyncClient; private static ReactiveHttpAsyncClient reactiveClient; @@ -29,7 +32,8 @@ public static void beforeAll() throws Exception { asyncClient = HttpAsyncClients.createDefault(); asyncClient.start(); reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); - flowableSourceBytes = ByteStreams.toByteArray(new URL(FLOWABLE_URL).openStream()); + flowableSourceBytes = + asyncClient.execute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null).get().getBodyBytes(); } @AfterAll @@ -39,10 +43,9 @@ public static void afterAll() throws Exception { @Test public void testVanillaExecuteWorks() { - final byte[] bodyBytes = Flowable.fromPublisher( - reactiveClient.execute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)), - SimpleResponseConsumer.create())) - .blockingSingle().getBodyBytes(); + final byte[] bodyBytes = Flowable.fromPublisher(reactiveClient.execute( + SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)), + SimpleResponseConsumer.create())).blockingSingle().getBodyBytes(); assertArrayEquals(flowableSourceBytes, bodyBytes); } @@ -50,26 +53,28 @@ public void testVanillaExecuteWorks() { public void testVanillaStreamingWorks() { final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute( - SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)), null)) - .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) - .blockingGet(); + SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)), null)) + .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } @Test public void testBasicStreamingWorks() { final byte[] bodyBytes = Flowable - .fromPublisher(reactiveClient - .streamingExecute(SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_URL)))) - .concatMap(Message::getBody).map(bb -> byteBufferToArray(bb)).reduce(Bytes::concat) - .blockingGet(); + .fromPublisher(reactiveClient.streamingExecute( + SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)))) + .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } - private static byte[] byteBufferToArray(ByteBuffer bb) { - final byte[] arr = new byte[bb.remaining()]; - bb.get(arr); - return arr; + private byte[] toByteArray(Publisher pub) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + WritableByteChannel channel = Channels.newChannel(out);) { + Flowable.fromPublisher(pub).blockingForEach(channel::write); + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } From 8a8a33c9ffe7d90a21ce97fab7c94ac93878e1d3 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Wed, 4 Mar 2020 14:47:13 -0800 Subject: [PATCH 35/59] better exception for cancelled --- .../com/saasquatch/client5reactive/FutureCallbacks.java | 8 ++++++-- .../client5reactive/ReactiveHttpAsyncClient.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index d6716a1..7fef767 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -29,7 +29,7 @@ public void failed(Exception ex) { @Override public void cancelled() { - emitter.onError(new CancellationException()); + emitter.onError(cancelledException()); } }; @@ -54,10 +54,14 @@ public void failed(Exception ex) { @Override public void cancelled() { - emitter.onError(new CancellationException()); + emitter.onError(cancelledException()); } }; } + private static Exception cancelledException() { + return new CancellationException("Future cancelled"); + } + } diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java index 6b34ccc..4d67920 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java @@ -20,7 +20,8 @@ import org.reactivestreams.Publisher; /** - * Thin wrapper around Apache {@link HttpAsyncClient} to expose Reactive Streams interfaces.
+ * Thin wrapper around Apache {@link HttpAsyncClient} to expose + * Reactive Streams interfaces.
* The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and * {@link CloseableHttpAsyncClient}. * From c7a66a5091dc63e00c7dadae1d35a30021886398 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 09:17:38 -0800 Subject: [PATCH 36/59] rename --- ...yncClient.java => HttpReactiveClient.java} | 4 +-- ...tImpl.java => HttpReactiveClientImpl.java} | 6 ++-- .../client5reactive/HttpReactiveClients.java | 32 +++++++++++++++++++ .../ReactiveHttpAsyncClients.java | 32 ------------------- .../client5reactive/LifecycleTests.java | 6 ++-- .../client5reactive/NullabilityTests.java | 4 +-- .../client5reactive/StreamingTests.java | 4 +-- 7 files changed, 44 insertions(+), 44 deletions(-) rename src/main/java/com/saasquatch/client5reactive/{ReactiveHttpAsyncClient.java => HttpReactiveClient.java} (98%) rename src/main/java/com/saasquatch/client5reactive/{ReactiveHttpAsyncClientImpl.java => HttpReactiveClientImpl.java} (90%) create mode 100644 src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java delete mode 100644 src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java similarity index 98% rename from src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java rename to src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java index 4d67920..99dc83f 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClient.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java @@ -26,9 +26,9 @@ * {@link CloseableHttpAsyncClient}. * * @author sli - * @see ReactiveHttpAsyncClients + * @see HttpReactiveClients */ -public interface ReactiveHttpAsyncClient { +public interface HttpReactiveClient { /** * @see HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java similarity index 90% rename from src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java rename to src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index f22e067..b68a7ef 100644 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -16,15 +16,15 @@ import io.reactivex.rxjava3.core.Single; /** - * Concrete implementation of {@link ReactiveHttpAsyncClient}. + * Concrete implementation of {@link HttpReactiveClient}. * * @author sli */ -final class ReactiveHttpAsyncClientImpl implements ReactiveHttpAsyncClient { +final class HttpReactiveClientImpl implements HttpReactiveClient { private final HttpAsyncClient httpAsyncClient; - ReactiveHttpAsyncClientImpl(HttpAsyncClient httpAsyncClient) { + HttpReactiveClientImpl(HttpAsyncClient httpAsyncClient) { this.httpAsyncClient = httpAsyncClient; } diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java new file mode 100644 index 0000000..7581db9 --- /dev/null +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java @@ -0,0 +1,32 @@ +package com.saasquatch.client5reactive; + +import java.util.Objects; +import javax.annotation.Nonnull; +import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; + +/** + * Factory methods for {@link HttpReactiveClient}. + * + * @author sli + * @see HttpAsyncClients + */ +public final class HttpReactiveClients { + + private HttpReactiveClients() {} + + /** + * Create a {@link HttpReactiveClient} from a given {@link HttpAsyncClient}. Note that the created + * {@link HttpReactiveClient} is simply a wrapper of the {@link HttpAsyncClient} and does not + * support state management, so you'll need to manage the state of the given + * {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, + * {@link CloseableHttpAsyncClient#close()}, etc. + */ + @Nonnull + public static HttpReactiveClient create(@Nonnull HttpAsyncClient httpAsyncClient) { + Objects.requireNonNull(httpAsyncClient); + return new HttpReactiveClientImpl(httpAsyncClient); + } + +} diff --git a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java b/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java deleted file mode 100644 index 129bb98..0000000 --- a/src/main/java/com/saasquatch/client5reactive/ReactiveHttpAsyncClients.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.saasquatch.client5reactive; - -import java.util.Objects; -import javax.annotation.Nonnull; -import org.apache.hc.client5.http.async.HttpAsyncClient; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; - -/** - * Factory methods for {@link ReactiveHttpAsyncClient}. - * - * @author sli - * @see HttpAsyncClients - */ -public final class ReactiveHttpAsyncClients { - - private ReactiveHttpAsyncClients() {} - - /** - * Create a {@link ReactiveHttpAsyncClient} from a given {@link HttpAsyncClient}. Note that the - * created {@link ReactiveHttpAsyncClient} is simply a wrapper of the {@link HttpAsyncClient} and - * does not support state management, so you'll need to manage the state of the given - * {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, - * {@link CloseableHttpAsyncClient#close()}, etc. - */ - @Nonnull - public static ReactiveHttpAsyncClient create(@Nonnull HttpAsyncClient httpAsyncClient) { - Objects.requireNonNull(httpAsyncClient); - return new ReactiveHttpAsyncClientImpl(httpAsyncClient); - } - -} diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java index 4dbbb92..b052d95 100644 --- a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java +++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java @@ -12,7 +12,7 @@ public class LifecycleTests { public void testNonStarted() throws Exception { try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { // Not started - final ReactiveHttpAsyncClient reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) .test().assertError(IllegalStateException.class); } @@ -20,10 +20,10 @@ public void testNonStarted() throws Exception { @Test public void testClosed() throws Exception { - final ReactiveHttpAsyncClient reactiveClient; + final HttpReactiveClient reactiveClient; try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { asyncClient.start(); - reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + reactiveClient = HttpReactiveClients.create(asyncClient); } // Closed Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index 9e2d3da..f61a40d 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -18,13 +18,13 @@ public class NullabilityTests { private static CloseableHttpAsyncClient asyncClient; - private static ReactiveHttpAsyncClient reactiveClient; + private static HttpReactiveClient reactiveClient; @BeforeAll public static void beforeAll() { asyncClient = HttpAsyncClients.createDefault(); asyncClient.start(); - reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + reactiveClient = HttpReactiveClients.create(asyncClient); } @AfterAll diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index e078df1..7b2fd63 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -24,14 +24,14 @@ public class StreamingTests { private static final String FLOWABLE_SOURCE_URL = "https://cdn.jsdelivr.net/gh/ReactiveX/RxJava@81f0569a8b9b7d27059f127b90fd7335118b2ee4/src/main/java/io/reactivex/rxjava3/core/Flowable.java"; private static CloseableHttpAsyncClient asyncClient; - private static ReactiveHttpAsyncClient reactiveClient; + private static HttpReactiveClient reactiveClient; private static byte[] flowableSourceBytes; @BeforeAll public static void beforeAll() throws Exception { asyncClient = HttpAsyncClients.createDefault(); asyncClient.start(); - reactiveClient = ReactiveHttpAsyncClients.create(asyncClient); + reactiveClient = HttpReactiveClients.create(asyncClient); flowableSourceBytes = asyncClient.execute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null).get().getBodyBytes(); } From 781b25f221de4dee8f91ee095f0fb36c64d4fcbe Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 09:34:16 -0800 Subject: [PATCH 37/59] badges --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index ee7a224..f31b172 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,6 @@ # apache-client5-reactive Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interfaces. + +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![Build Status](https://travis-ci.org/saasquatch/apache-client5-reactive.svg?branch=master)](https://travis-ci.org/saasquatch/apache-client5-reactive) +[![](https://jitpack.io/v/saasquatch/apache-client5-reactive.svg)](https://jitpack.io/#saasquatch/apache-client5-reactive) From 9ba8ef734b0be004d9506ac3ff2f8882abda5d06 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 09:34:43 -0800 Subject: [PATCH 38/59] license boilerplate --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index f31b172..bc5f981 100644 --- a/README.md +++ b/README.md @@ -4,3 +4,25 @@ Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interf [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Build Status](https://travis-ci.org/saasquatch/apache-client5-reactive.svg?branch=master)](https://travis-ci.org/saasquatch/apache-client5-reactive) [![](https://jitpack.io/v/saasquatch/apache-client5-reactive.svg)](https://jitpack.io/#saasquatch/apache-client5-reactive) + +## License + +Unless explicitly stated otherwise all files in this repository are licensed under the Apache License 2.0. + +License boilerplate: + +``` +Copyright 2019 ReferralSaaSquatch.com Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +``` From 88fe73f6728b43b1d023c66e969d0cfd4f258853 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 09:38:33 -0800 Subject: [PATCH 39/59] basic docs --- README.md | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bc5f981..5bb3652 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,52 @@ # apache-client5-reactive -Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interfaces. [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Build Status](https://travis-ci.org/saasquatch/apache-client5-reactive.svg?branch=master)](https://travis-ci.org/saasquatch/apache-client5-reactive) [![](https://jitpack.io/v/saasquatch/apache-client5-reactive.svg)](https://jitpack.io/#saasquatch/apache-client5-reactive) +Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interfaces. + +## Adding it to your project + +### Add the repository + +Maven + +```xml + + + jitpack.io + https://jitpack.io + + +``` + +Gradle + +```gradle +repositories { + maven { url 'https://jitpack.io' } +} +``` + +### Add the dependency + +Maven + +```xml + + com.github.saasquatch + apache-client5-reactive + REPLACEME + +``` + +Gradle + +```gradle +compile 'com.github.saasquatch:apache-client5-reactive:REPLACEME' +``` + ## License Unless explicitly stated otherwise all files in this repository are licensed under the Apache License 2.0. From 0057a43a92d2307110e24ad4a34f79cf4c5c8d61 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 10:01:57 -0800 Subject: [PATCH 40/59] added example --- README.md | 4 ++ .../client5reactive/LifecycleTests.java | 6 ++- .../client5reactive/examples/Example.java | 41 +++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/saasquatch/client5reactive/examples/Example.java diff --git a/README.md b/README.md index 5bb3652..4a9a09c 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interf ## Adding it to your project +## Sample usage + +TODO + ### Add the repository Maven diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java index b052d95..6cd9471 100644 --- a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java +++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java @@ -13,7 +13,8 @@ public void testNonStarted() throws Exception { try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { // Not started final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); - Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) + Flowable + .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) .test().assertError(IllegalStateException.class); } } @@ -26,7 +27,8 @@ public void testClosed() throws Exception { reactiveClient = HttpReactiveClients.create(asyncClient); } // Closed - Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://example.com"))) + Flowable + .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) .test().assertError(IllegalStateException.class); } diff --git a/src/test/java/com/saasquatch/client5reactive/examples/Example.java b/src/test/java/com/saasquatch/client5reactive/examples/Example.java new file mode 100644 index 0000000..32d157a --- /dev/null +++ b/src/test/java/com/saasquatch/client5reactive/examples/Example.java @@ -0,0 +1,41 @@ +package com.saasquatch.client5reactive.examples; + +import static java.nio.charset.StandardCharsets.UTF_8; +import java.nio.ByteBuffer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.Message; +import com.saasquatch.client5reactive.HttpReactiveClient; +import com.saasquatch.client5reactive.HttpReactiveClients; +import io.reactivex.rxjava3.core.Single; + +public class Example { + + private static final String EXAMPLE_URL = "https://www.example.com"; + + public static void main(String[] args) throws Exception { + try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { + asyncClient.start(); + final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); + Single.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))) + .doOnSuccess(response -> System.out.println(response.getCode())) + .blockingSubscribe(); + System.out.println("----------"); + Single.fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(EXAMPLE_URL))) + .doOnSuccess(message -> System.out.println(message.getHead().getCode())) + .flatMapPublisher(Message::getBody) + .toList() + .map(byteBuffers -> { + final int totalLength = byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum(); + final ByteBuffer combined = ByteBuffer.allocate(totalLength); + byteBuffers.forEach(combined::put); + combined.flip(); + return UTF_8.decode(combined).toString(); + }) + .doOnSuccess(System.out::println) + .blockingSubscribe(); + } + } + +} From e37627a479d8ce617219dbc4a5bef1e2373f9930 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 10:12:49 -0800 Subject: [PATCH 41/59] reference examples --- README.md | 2 +- .../com/saasquatch/client5reactive/examples/Example.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4a9a09c..c25b0f0 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interf ## Sample usage -TODO +For examples, see package [`com.saasquatch.client5reactive.examples`](https://github.com/saasquatch/apache-client5-reactive/tree/master/src/test/java/com/saasquatch/client5reactive/examples). ### Add the repository diff --git a/src/test/java/com/saasquatch/client5reactive/examples/Example.java b/src/test/java/com/saasquatch/client5reactive/examples/Example.java index 32d157a..930df72 100644 --- a/src/test/java/com/saasquatch/client5reactive/examples/Example.java +++ b/src/test/java/com/saasquatch/client5reactive/examples/Example.java @@ -5,7 +5,6 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; -import org.apache.hc.core5.http.Message; import com.saasquatch.client5reactive.HttpReactiveClient; import com.saasquatch.client5reactive.HttpReactiveClients; import io.reactivex.rxjava3.core.Single; @@ -23,8 +22,10 @@ public static void main(String[] args) throws Exception { .blockingSubscribe(); System.out.println("----------"); Single.fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(EXAMPLE_URL))) - .doOnSuccess(message -> System.out.println(message.getHead().getCode())) - .flatMapPublisher(Message::getBody) + .flatMapPublisher(message -> { + System.out.println(message.getHead().getCode()); + return message.getBody(); + }) .toList() .map(byteBuffers -> { final int totalLength = byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum(); From 622d0a02e1a0c4d66742d5fd047b4690d4700a5e Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 11:02:01 -0800 Subject: [PATCH 42/59] Update Example.java --- .../com/saasquatch/client5reactive/examples/Example.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/saasquatch/client5reactive/examples/Example.java b/src/test/java/com/saasquatch/client5reactive/examples/Example.java index 930df72..10dae99 100644 --- a/src/test/java/com/saasquatch/client5reactive/examples/Example.java +++ b/src/test/java/com/saasquatch/client5reactive/examples/Example.java @@ -18,7 +18,10 @@ public static void main(String[] args) throws Exception { asyncClient.start(); final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); Single.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))) - .doOnSuccess(response -> System.out.println(response.getCode())) + .doOnSuccess(response -> { + System.out.println(response.getCode()); + System.out.println(response.getBodyText()); + }) .blockingSubscribe(); System.out.println("----------"); Single.fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(EXAMPLE_URL))) From 04cf9dced2f046905f6d9230961631bc85972063 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 12:04:43 -0800 Subject: [PATCH 43/59] cleanup --- .../client5reactive/FutureCallbacks.java | 22 ------------------ .../HttpReactiveClientImpl.java | 10 +++++--- .../client5reactive/FutureCallbacksTests.java | 23 +------------------ 3 files changed, 8 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java index 7fef767..c41b6d5 100644 --- a/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java +++ b/src/main/java/com/saasquatch/client5reactive/FutureCallbacks.java @@ -3,7 +3,6 @@ import java.util.concurrent.CancellationException; import org.apache.hc.core5.concurrent.FutureCallback; import io.reactivex.rxjava3.core.MaybeEmitter; -import io.reactivex.rxjava3.core.SingleEmitter; /** * Utilities for {@link FutureCallback}s. Not public. @@ -14,27 +13,6 @@ final class FutureCallbacks { private FutureCallbacks() {} - public static FutureCallback singleEmitter(SingleEmitter emitter) { - return new FutureCallback() { - - @Override - public void completed(T result) { - emitter.onSuccess(result); - } - - @Override - public void failed(Exception ex) { - emitter.onError(ex); - } - - @Override - public void cancelled() { - emitter.onError(cancelledException()); - } - - }; - } - public static FutureCallback maybeEmitter(MaybeEmitter emitter) { return new FutureCallback() { diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index b68a7ef..4d26a46 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -13,7 +13,6 @@ import org.apache.hc.core5.reactive.ReactiveResponseConsumer; import org.reactivestreams.Publisher; import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Single; /** * Concrete implementation of {@link HttpReactiveClient}. @@ -45,9 +44,14 @@ public Publisher>> streamingExecute( AsyncRequestProducer requestProducer, HandlerFactory pushHandlerFactory, HttpContext context) { Objects.requireNonNull(requestProducer); - return Single.>>create(emitter -> { + /* + * Semantically this should be a Single instead of a Maybe, but using Single here requires an + * additional implementation of FutureCallback, and since we are returning a Publisher, it + * doesn't really make a difference. + */ + return Maybe.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = - new ReactiveResponseConsumer(FutureCallbacks.singleEmitter(emitter)); + new ReactiveResponseConsumer(FutureCallbacks.maybeEmitter(emitter)); httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, null); }).toFlowable(); } diff --git a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java index d7e2d83..f442249 100644 --- a/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java +++ b/src/test/java/com/saasquatch/client5reactive/FutureCallbacksTests.java @@ -5,32 +5,11 @@ import org.apache.hc.core5.concurrent.BasicFuture; import org.junit.jupiter.api.Test; import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Single; public class FutureCallbacksTests { @Test - public void testSingle() { - Single.create(emitter -> { - final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); - future.completed(1); - }).test().assertResult(1); - Single.create(emitter -> { - final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); - future.completed(null); - }).test().assertError(NullPointerException.class); - Single.create(emitter -> { - final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); - future.failed(new IOException()); - }).test().assertError(IOException.class); - Single.create(emitter -> { - final BasicFuture future = new BasicFuture<>(FutureCallbacks.singleEmitter(emitter)); - future.cancel(); - }).test().assertError(CancellationException.class); - } - - @Test - public void testMaybe() { + public void testMaybeEmitter() { Maybe.create(emitter -> { final BasicFuture future = new BasicFuture<>(FutureCallbacks.maybeEmitter(emitter)); future.completed(1); From d5a9eed63d3d80b55f23ffd7e6d375b359aa8cba Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 12:15:56 -0800 Subject: [PATCH 44/59] javadoc update --- .../client5reactive/HttpReactiveClient.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java index 99dc83f..30196f5 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java @@ -1,6 +1,7 @@ package com.saasquatch.client5reactive; import java.nio.ByteBuffer; +import java.util.concurrent.Future; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.hc.client5.http.async.HttpAsyncClient; @@ -31,8 +32,11 @@ public interface HttpReactiveClient { /** - * @see HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, - * HttpContext, FutureCallback) + * Execute the given request. This method is equivalent to + * {@link HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext , FutureCallback)}. + * If the {@link Future} produced by the equivalent {@link HttpAsyncClient} method completes with + * {@code null}, then the returning {@link Publisher} of this method will complete with no + * element. */ Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer, @@ -40,8 +44,10 @@ Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context); /** - * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, - * FutureCallback) + * Convenience method for + * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)}, + * equivalent to + * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, FutureCallback)} */ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer, @Nullable HttpContext context) { @@ -49,8 +55,10 @@ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, } /** - * @see CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, - * FutureCallback) + * Convenience method for + * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)}, + * equivalent to + * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, FutureCallback)}. */ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer) { @@ -58,7 +66,9 @@ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, } /** - * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback) + * Execute the given request. This method is equivalent to + * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The + * returning {@link Publisher} completes with exactly 1 element. */ default Publisher execute(@Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { @@ -66,14 +76,16 @@ default Publisher execute(@Nonnull SimpleHttpRequest request } /** - * @see CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback) + * Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to + * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}. */ default Publisher execute(@Nonnull SimpleHttpRequest request) { return execute(request, null); } /** - * Execute the given request and get a streaming response. + * Execute the given request and get a streaming response body as a {@link Publisher} of + * {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. */ Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, From fb70ad28e6b0c4b6efae5fd49f8a73eedea826d4 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 12:18:26 -0800 Subject: [PATCH 45/59] test cleanup --- .../saasquatch/client5reactive/LifecycleTests.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java index 6cd9471..5911df3 100644 --- a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java +++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java @@ -8,14 +8,15 @@ public class LifecycleTests { + private static final String EXAMPLE_URL = "https://www.example.com"; + @Test public void testNonStarted() throws Exception { try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { // Not started final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); - Flowable - .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) - .test().assertError(IllegalStateException.class); + Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test() + .assertError(IllegalStateException.class); } } @@ -27,9 +28,8 @@ public void testClosed() throws Exception { reactiveClient = HttpReactiveClients.create(asyncClient); } // Closed - Flowable - .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) - .test().assertError(IllegalStateException.class); + Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test() + .assertError(IllegalStateException.class); } } From 2cddf3ed5b3b56220740c38211f6357528042e02 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 12:49:51 -0800 Subject: [PATCH 46/59] update inception year --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c25b0f0..ae46212 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Unless explicitly stated otherwise all files in this repository are licensed und License boilerplate: ``` -Copyright 2019 ReferralSaaSquatch.com Inc. +Copyright 2020 ReferralSaaSquatch.com Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 0f74ebb0da978bebaa1cb05dc30ca71d7836aa3e Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 13:17:15 -0800 Subject: [PATCH 47/59] unnecessary param --- .../java/com/saasquatch/client5reactive/StreamingTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 7b2fd63..081c957 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -53,7 +53,7 @@ public void testVanillaExecuteWorks() { public void testVanillaStreamingWorks() { final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute( - SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)), null)) + SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)))) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } From c48acb6a9603a22eea501f50aa304946f06d56a7 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 13:17:48 -0800 Subject: [PATCH 48/59] test fix --- .../java/com/saasquatch/client5reactive/StreamingTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 081c957..70a7fa3 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -61,8 +61,7 @@ public void testVanillaStreamingWorks() { @Test public void testBasicStreamingWorks() { final byte[] bodyBytes = Flowable - .fromPublisher(reactiveClient.streamingExecute( - SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)))) + .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL))) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } From 1276a08f413d3e42b2dbc1cc547dd4800c242112 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 13:36:04 -0800 Subject: [PATCH 49/59] version update --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ae46212..60dd8b2 100644 --- a/README.md +++ b/README.md @@ -41,14 +41,14 @@ Maven com.github.saasquatch apache-client5-reactive - REPLACEME + 0.0.1 ``` Gradle ```gradle -compile 'com.github.saasquatch:apache-client5-reactive:REPLACEME' +compile 'com.github.saasquatch:apache-client5-reactive:0.0.1' ``` ## License From 9e84acc54e2328887b3c2929289021e9814b976a Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 13:53:19 -0800 Subject: [PATCH 50/59] javadoc update --- .../com/saasquatch/client5reactive/HttpReactiveClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java index 30196f5..4ac3a37 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java @@ -66,7 +66,8 @@ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, } /** - * Execute the given request. This method is equivalent to + * Execute a simple in-memory request and get a simple in-memory response. This method is + * equivalent to * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The * returning {@link Publisher} completes with exactly 1 element. */ @@ -111,7 +112,7 @@ default Publisher>> streamingExecute } /** - * Convenience method for + * Execute a simple in-memory request and get a streaming response. Convenience method for * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} */ default Publisher>> streamingExecute( From d6c7a2e1b680cc092be80821cbc92c8675b5a396 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 14:01:41 -0800 Subject: [PATCH 51/59] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 60dd8b2..55c0f11 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Build Status](https://travis-ci.org/saasquatch/apache-client5-reactive.svg?branch=master)](https://travis-ci.org/saasquatch/apache-client5-reactive) [![](https://jitpack.io/v/saasquatch/apache-client5-reactive.svg)](https://jitpack.io/#saasquatch/apache-client5-reactive) -Thin wrapper around Apache HttpAsyncClient 5.x to expose Reactive Streams interfaces. +Thin wrapper around Apache HttpComponents HttpAsyncClient 5.x to expose Reactive Streams interfaces. ## Adding it to your project From e06af0d7c1b33e74bc646f8763489d97f4786a90 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:05:49 -0800 Subject: [PATCH 52/59] NPE fix due to Apache HC internal bug --- .../HttpReactiveClientImpl.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index 4d26a46..2fcdaa5 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -2,7 +2,11 @@ import java.nio.ByteBuffer; import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.hc.client5.http.async.HttpAsyncClient; +import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; +import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.nio.AsyncPushConsumer; @@ -34,8 +38,8 @@ public Publisher execute(AsyncRequestProducer requestProducer, Objects.requireNonNull(requestProducer); Objects.requireNonNull(responseConsumer); return Maybe.create(emitter -> { - httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, - FutureCallbacks.maybeEmitter(emitter)); + httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, + defaultHttpContext(context), FutureCallbacks.maybeEmitter(emitter)); }).toFlowable(); } @@ -52,8 +56,22 @@ public Publisher>> streamingExecute( return Maybe.>>create(emitter -> { final ReactiveResponseConsumer responseConsumer = new ReactiveResponseConsumer(FutureCallbacks.maybeEmitter(emitter)); - httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, null); + httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, + defaultHttpContext(context), null); }).toFlowable(); } + /** + * Create a default {@link HttpContext} if the given one is null. In theory this method should not + * be needed, since according to the Javadoc in {@link HttpAsyncClient}, {@link HttpContext} can + * be null. However, in Apache HttpClient version 5.0 there's a bug where + * {@link MinimalHttpAsyncClient} does not accept null {@link HttpContext}. An issue has already + * been filed here. Once + * that's fixed, this method can be removed. + */ + @Nonnull + private static HttpContext defaultHttpContext(@Nullable HttpContext context) { + return context == null ? HttpClientContext.create() : context; + } + } From 66b2769d2633e01eae1d3f11e83be8c7110abffb Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:08:18 -0800 Subject: [PATCH 53/59] test to cover minimal client --- .../client5reactive/StreamingTests.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 70a7fa3..59ac154 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -12,7 +12,9 @@ import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.protocol.BasicHttpContext; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -66,6 +68,28 @@ public void testBasicStreamingWorks() { assertArrayEquals(flowableSourceBytes, bodyBytes); } + @Test + public void testWithMinimalClient() throws Exception { + try (MinimalHttpAsyncClient asyncClient = HttpAsyncClients.createMinimal()) { + asyncClient.start(); + final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); + { + final byte[] bodyBytes = Flowable + .fromPublisher( + reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null)) + .concatMap(Message::getBody).to(this::toByteArray); + assertArrayEquals(flowableSourceBytes, bodyBytes); + } + { + final byte[] bodyBytes = Flowable + .fromPublisher(reactiveClient.streamingExecute( + SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), new BasicHttpContext())) + .concatMap(Message::getBody).to(this::toByteArray); + assertArrayEquals(flowableSourceBytes, bodyBytes); + } + } + } + private byte[] toByteArray(Publisher pub) { try (ByteArrayOutputStream out = new ByteArrayOutputStream(); WritableByteChannel channel = Channels.newChannel(out);) { From 435aae9f02a242efdeb9b3b37845adbd37d3f3c4 Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:16:07 -0800 Subject: [PATCH 54/59] typo --- .../com/saasquatch/client5reactive/HttpReactiveClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index 2fcdaa5..1bfb796 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -63,7 +63,7 @@ public Publisher>> streamingExecute( /** * Create a default {@link HttpContext} if the given one is null. In theory this method should not - * be needed, since according to the Javadoc in {@link HttpAsyncClient}, {@link HttpContext} can + * be needed, since according to the JavaDoc in {@link HttpAsyncClient}, {@link HttpContext} can * be null. However, in Apache HttpClient version 5.0 there's a bug where * {@link MinimalHttpAsyncClient} does not accept null {@link HttpContext}. An issue has already * been filed here. Once From 1098495085ce8fb2c7c5db656d20e47b19dd48cb Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:22:05 -0800 Subject: [PATCH 55/59] Update HttpReactiveClientImpl.java --- .../client5reactive/HttpReactiveClientImpl.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index 1bfb796..8fa650b 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -64,10 +64,11 @@ public Publisher>> streamingExecute( /** * Create a default {@link HttpContext} if the given one is null. In theory this method should not * be needed, since according to the JavaDoc in {@link HttpAsyncClient}, {@link HttpContext} can - * be null. However, in Apache HttpClient version 5.0 there's a bug where - * {@link MinimalHttpAsyncClient} does not accept null {@link HttpContext}. An issue has already - * been filed here. Once - * that's fixed, this method can be removed. + * be null. However, in Apache HttpClient version 5.0 there's a bug where some implementations of + * {@link HttpAsyncClient} like {@link MinimalHttpAsyncClient} do not accept null + * {@link HttpContext}. An issue has already been filed + * here. Once that's fixed, + * this method can be removed. */ @Nonnull private static HttpContext defaultHttpContext(@Nullable HttpContext context) { From 324d6b332b206ae66dcedbd19179562862c0109f Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:45:19 -0800 Subject: [PATCH 56/59] Update HttpReactiveClientImpl.java --- .../saasquatch/client5reactive/HttpReactiveClientImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index 8fa650b..b824b2c 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -65,8 +65,8 @@ public Publisher>> streamingExecute( * Create a default {@link HttpContext} if the given one is null. In theory this method should not * be needed, since according to the JavaDoc in {@link HttpAsyncClient}, {@link HttpContext} can * be null. However, in Apache HttpClient version 5.0 there's a bug where some implementations of - * {@link HttpAsyncClient} like {@link MinimalHttpAsyncClient} do not accept null - * {@link HttpContext}. An issue has already been filed + * {@link HttpAsyncClient} like {@link MinimalHttpAsyncClient} reject null {@link HttpContext}. An + * issue has already been filed * here. Once that's fixed, * this method can be removed. */ From 05d504f19a91d027de015782194500040214990f Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 15:53:29 -0800 Subject: [PATCH 57/59] Update .travis.yml --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 5ac4518..6b3e9ba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: java jdk: - openjdk8 + - openjdk11 os: linux cache: directories: From 541c82231adf641483c687a6f7c0da004d8e5fef Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Thu, 5 Mar 2020 16:01:27 -0800 Subject: [PATCH 58/59] Update HttpReactiveClient.java --- .../com/saasquatch/client5reactive/HttpReactiveClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java index 4ac3a37..7806360 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java @@ -86,7 +86,8 @@ default Publisher execute(@Nonnull SimpleHttpRequest request /** * Execute the given request and get a streaming response body as a {@link Publisher} of - * {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. + * {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The + * {@link Publisher} within the returning {@link Publisher} may contain 0 to n elements. */ Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, From 18e5a6e9df32ac7885f7c47827eaed107f6fe79c Mon Sep 17 00:00:00 2001 From: slisaasquatch Date: Fri, 6 Mar 2020 10:06:59 -0800 Subject: [PATCH 59/59] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 55c0f11..2eb0dfb 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,12 @@ Thin wrapper around Apache HttpComponents HttpAsyncClient 5.x to expose Reactive Streams interfaces. -## Adding it to your project - ## Sample usage For examples, see package [`com.saasquatch.client5reactive.examples`](https://github.com/saasquatch/apache-client5-reactive/tree/master/src/test/java/com/saasquatch/client5reactive/examples). +## Adding it to your project + ### Add the repository Maven