diff --git a/.gitignore b/.gitignore index 2c77a63..b61fdae 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ hs_err_pid* .settings /target/ bin + +.idea +*.iml diff --git a/README.md b/README.md index 03a6336..d73f6bd 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,14 @@ Thin wrapper around [Apache HttpComponents](https://hc.apache.org/) HttpAsyncCli ```java import static java.nio.charset.StandardCharsets.UTF_8; -import java.nio.ByteBuffer; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; + import com.saasquatch.client5reactive.HttpReactiveClient; import com.saasquatch.client5reactive.HttpReactiveClients; import io.reactivex.rxjava3.core.Single; +import java.nio.ByteBuffer; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; public class Example { @@ -31,8 +32,8 @@ public class Example { // HttpReactiveClient is just a thin wrapper final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); // Execute a simple in-memory request - Single - .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) + Single.fromPublisher( + reactiveClient.execute(SimpleRequestBuilder.get("https://www.example.com").build())) .doOnSuccess(response -> { // Get the response status and body in memory System.out.println(response.getCode()); @@ -42,9 +43,8 @@ public class Example { System.out.println("----------"); // Execute a streaming request // In this case, the request is a simple in-memory request without a request body - Single - .fromPublisher( - reactiveClient.streamingExecute(SimpleHttpRequests.get("https://www.example.com"))) + Single.fromPublisher(reactiveClient.streamingExecute( + SimpleRequestBuilder.get("https://www.example.com").build())) .flatMapPublisher(message -> { // Get the status before subscribing to the streaming body System.out.println(message.getHead().getCode()); @@ -101,14 +101,14 @@ Maven com.github.saasquatch apache-client5-reactive - 0.0.3 + 0.0.4 ``` Gradle ```gradle -compile 'com.github.saasquatch:apache-client5-reactive:0.0.3' +implementation 'com.github.saasquatch:apache-client5-reactive:0.0.4' ``` ## License diff --git a/pom.xml b/pom.xml index ca9e383..b66cb7f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.saasquatch apache-client5-reactive - 0.0.3-SNAPSHOT + 0.0.4-SNAPSHOT jar apache-client5-reactive @@ -19,7 +19,7 @@ UTF-8 - 5.7.0 + 5.7.2 @@ -38,17 +38,17 @@ org.apache.httpcomponents.client5 httpclient5 - 5.0.3 + 5.1 org.apache.httpcomponents.core5 httpcore5-reactive - 5.0.2 + 5.1.1 io.reactivex.rxjava3 rxjava - 3.0.7 + 3.0.13 com.google.code.findbugs @@ -77,7 +77,7 @@ org.apache.maven.plugins maven-source-plugin - 3.2.0 + 3.2.1 attach-sources @@ -90,9 +90,10 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.1.1 + 3.3.0 none + 8 diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java index 7806360..287cf6f 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClient.java @@ -23,8 +23,8 @@ /** * Thin wrapper around Apache {@link HttpAsyncClient} to expose * Reactive Streams interfaces.
- * The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and - * {@link CloseableHttpAsyncClient}. + * The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and {@link + * CloseableHttpAsyncClient}. * * @author sli * @see HttpReactiveClients @@ -32,11 +32,10 @@ public interface HttpReactiveClient { /** - * Execute the given request. This method is equivalent to - * {@link HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext , FutureCallback)}. - * If the {@link Future} produced by the equivalent {@link HttpAsyncClient} method completes with - * {@code null}, then the returning {@link Publisher} of this method will complete with no - * element. + * Execute the given request. This method is equivalent to {@link HttpAsyncClient#execute(AsyncRequestProducer, + * AsyncResponseConsumer, HandlerFactory, HttpContext, FutureCallback)}. If the {@link Future} + * produced by the equivalent {@link HttpAsyncClient} method completes with {@code null}, then the + * returning {@link Publisher} of this method will complete with no element. */ Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer, @@ -44,10 +43,9 @@ Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context); /** - * Convenience method for - * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)}, - * equivalent to - * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, FutureCallback)} + * Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, + * HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, + * AsyncResponseConsumer, HttpContext, FutureCallback)} */ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer, @Nullable HttpContext context) { @@ -55,10 +53,9 @@ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, } /** - * Convenience method for - * {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)}, - * equivalent to - * {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, FutureCallback)}. + * Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, + * HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, + * AsyncResponseConsumer, FutureCallback)}. */ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, @Nonnull AsyncResponseConsumer responseConsumer) { @@ -67,9 +64,8 @@ default Publisher execute(@Nonnull AsyncRequestProducer requestProducer, /** * Execute a simple in-memory request and get a simple in-memory response. This method is - * equivalent to - * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The - * returning {@link Publisher} completes with exactly 1 element. + * equivalent to {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, + * FutureCallback)}. The returning {@link Publisher} completes with exactly 1 element. */ default Publisher execute(@Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { @@ -77,17 +73,17 @@ default Publisher execute(@Nonnull SimpleHttpRequest request } /** - * Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to - * {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}. + * Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to {@link + * CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}. */ default Publisher execute(@Nonnull SimpleHttpRequest request) { return execute(request, null); } /** - * Execute the given request and get a streaming response body as a {@link Publisher} of - * {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The - * {@link Publisher} within the returning {@link Publisher} may contain 0 to n elements. + * Execute the given request and get a streaming response body as a {@link Publisher} of {@link + * ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The {@link + * Publisher} within the returning {@link Publisher} may contain 0 to n elements. */ Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, @@ -95,8 +91,8 @@ Publisher>> streamingExecute( @Nullable HttpContext context); /** - * Convenience method for - * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + * Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, + * HttpContext)} */ default Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) { @@ -104,8 +100,8 @@ default Publisher>> streamingExecute } /** - * Convenience method for - * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + * Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, + * HttpContext)} */ default Publisher>> streamingExecute( @Nonnull AsyncRequestProducer requestProducer) { @@ -113,8 +109,8 @@ default Publisher>> streamingExecute } /** - * Execute a simple in-memory request and get a streaming response. Convenience method for - * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + * Execute a simple in-memory request and get a streaming response. Convenience method for {@link + * #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} */ default Publisher>> streamingExecute( @Nonnull SimpleHttpRequest request, @Nullable HttpContext context) { @@ -122,8 +118,8 @@ default Publisher>> streamingExecute } /** - * Convenience method for - * {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)} + * Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, + * HttpContext)} */ default Publisher>> streamingExecute( @Nonnull SimpleHttpRequest request) { diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java index 4d26a46..3fc4e5d 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClientImpl.java @@ -2,6 +2,7 @@ import java.nio.ByteBuffer; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.hc.client5.http.async.HttpAsyncClient; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; @@ -14,6 +15,8 @@ import org.reactivestreams.Publisher; import io.reactivex.rxjava3.core.Maybe; +import javax.annotation.Nonnull; + /** * Concrete implementation of {@link HttpReactiveClient}. * @@ -28,11 +31,13 @@ final class HttpReactiveClientImpl implements HttpReactiveClient { } @Override - public Publisher execute(AsyncRequestProducer requestProducer, - AsyncResponseConsumer responseConsumer, - HandlerFactory pushHandlerFactory, HttpContext context) { + public Publisher execute(@Nonnull AsyncRequestProducer requestProducer, + @Nonnull AsyncResponseConsumer responseConsumer, + @Nullable HandlerFactory pushHandlerFactory, + @Nullable HttpContext context) { Objects.requireNonNull(requestProducer); Objects.requireNonNull(responseConsumer); + //noinspection CodeBlock2Expr return Maybe.create(emitter -> { httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context, FutureCallbacks.maybeEmitter(emitter)); @@ -41,8 +46,9 @@ public Publisher execute(AsyncRequestProducer requestProducer, @Override public Publisher>> streamingExecute( - AsyncRequestProducer requestProducer, HandlerFactory pushHandlerFactory, - HttpContext context) { + @Nonnull AsyncRequestProducer requestProducer, + @Nullable HandlerFactory pushHandlerFactory, + @Nullable HttpContext context) { Objects.requireNonNull(requestProducer); /* * Semantically this should be a Single instead of a Maybe, but using Single here requires an diff --git a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java index 7581db9..15a6d87 100644 --- a/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java +++ b/src/main/java/com/saasquatch/client5reactive/HttpReactiveClients.java @@ -19,9 +19,9 @@ private HttpReactiveClients() {} /** * Create a {@link HttpReactiveClient} from a given {@link HttpAsyncClient}. Note that the created * {@link HttpReactiveClient} is simply a wrapper of the {@link HttpAsyncClient} and does not - * support state management, so you'll need to manage the state of the given - * {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, - * {@link CloseableHttpAsyncClient#close()}, etc. + * support state management, so you'll need to manage the state of the given {@link + * HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, {@link + * CloseableHttpAsyncClient#close()}, etc. */ @Nonnull public static HttpReactiveClient create(@Nonnull HttpAsyncClient httpAsyncClient) { diff --git a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java index 5911df3..83672ed 100644 --- a/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java +++ b/src/test/java/com/saasquatch/client5reactive/LifecycleTests.java @@ -1,10 +1,10 @@ package com.saasquatch.client5reactive; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import io.reactivex.rxjava3.core.Flowable; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.junit.jupiter.api.Test; -import io.reactivex.rxjava3.core.Flowable; public class LifecycleTests { @@ -15,8 +15,8 @@ public void testNonStarted() throws Exception { try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) { // Not started final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); - Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test() - .assertError(IllegalStateException.class); + Flowable.fromPublisher(reactiveClient.execute(SimpleRequestBuilder.get(EXAMPLE_URL).build())) + .test().assertError(IllegalStateException.class); } } @@ -28,8 +28,8 @@ public void testClosed() throws Exception { reactiveClient = HttpReactiveClients.create(asyncClient); } // Closed - Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test() - .assertError(IllegalStateException.class); + Flowable.fromPublisher(reactiveClient.execute(SimpleRequestBuilder.get(EXAMPLE_URL).build())) + .test().assertError(IllegalStateException.class); } } diff --git a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java index f61a40d..927cb9d 100644 --- a/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java +++ b/src/test/java/com/saasquatch/client5reactive/NullabilityTests.java @@ -2,8 +2,10 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.reactivex.rxjava3.core.Flowable; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; @@ -13,7 +15,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; -import io.reactivex.rxjava3.core.Flowable; public class NullabilityTests { @@ -32,6 +33,7 @@ public static void afterAll() throws Exception { asyncClient.close(); } + @SuppressWarnings({"ConstantConditions", "RedundantCast"}) @Test public void testNullability() { assertThrows(NullPointerException.class, @@ -59,7 +61,7 @@ public void testNullability() { @Test public void testVoidPublisher() { final Publisher voidResultPublisher = reactiveClient.execute( - SimpleRequestProducer.create(SimpleHttpRequests.get("https://example.com")), + SimpleRequestProducer.create(SimpleRequestBuilder.get("https://example.com").build()), new ReactiveResponseConsumer()); assertDoesNotThrow(() -> Flowable.fromPublisher(voidResultPublisher).blockingSubscribe()); } diff --git a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java index 59ac154..6353dee 100644 --- a/src/test/java/com/saasquatch/client5reactive/StreamingTests.java +++ b/src/test/java/com/saasquatch/client5reactive/StreamingTests.java @@ -1,13 +1,15 @@ package com.saasquatch.client5reactive; import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import io.reactivex.rxjava3.core.Flowable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; @@ -19,7 +21,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; -import io.reactivex.rxjava3.core.Flowable; public class StreamingTests { @@ -34,8 +35,8 @@ public static void beforeAll() throws Exception { asyncClient = HttpAsyncClients.createDefault(); asyncClient.start(); reactiveClient = HttpReactiveClients.create(asyncClient); - flowableSourceBytes = - asyncClient.execute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null).get().getBodyBytes(); + flowableSourceBytes = asyncClient.execute( + SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build(), null).get().getBodyBytes(); } @AfterAll @@ -46,7 +47,7 @@ public static void afterAll() throws Exception { @Test public void testVanillaExecuteWorks() { final byte[] bodyBytes = Flowable.fromPublisher(reactiveClient.execute( - SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)), + SimpleRequestProducer.create(SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build()), SimpleResponseConsumer.create())).blockingSingle().getBodyBytes(); assertArrayEquals(flowableSourceBytes, bodyBytes); } @@ -55,7 +56,7 @@ public void testVanillaExecuteWorks() { public void testVanillaStreamingWorks() { final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute( - SimpleRequestProducer.create(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL)))) + SimpleRequestProducer.create(SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build()))) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } @@ -63,27 +64,29 @@ public void testVanillaStreamingWorks() { @Test public void testBasicStreamingWorks() { final byte[] bodyBytes = Flowable - .fromPublisher(reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL))) + .fromPublisher( + reactiveClient.streamingExecute(SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build())) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } @Test - public void testWithMinimalClient() throws Exception { + public void testWithMinimalClient() { try (MinimalHttpAsyncClient asyncClient = HttpAsyncClients.createMinimal()) { asyncClient.start(); final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); { final byte[] bodyBytes = Flowable .fromPublisher( - reactiveClient.streamingExecute(SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), null)) + reactiveClient + .streamingExecute(SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build(), null)) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } { final byte[] bodyBytes = Flowable .fromPublisher(reactiveClient.streamingExecute( - SimpleHttpRequests.get(FLOWABLE_SOURCE_URL), new BasicHttpContext())) + SimpleRequestBuilder.get(FLOWABLE_SOURCE_URL).build(), new BasicHttpContext())) .concatMap(Message::getBody).to(this::toByteArray); assertArrayEquals(flowableSourceBytes, bodyBytes); } @@ -92,7 +95,7 @@ public void testWithMinimalClient() throws Exception { private byte[] toByteArray(Publisher pub) { try (ByteArrayOutputStream out = new ByteArrayOutputStream(); - WritableByteChannel channel = Channels.newChannel(out);) { + WritableByteChannel channel = Channels.newChannel(out)) { Flowable.fromPublisher(pub).blockingForEach(channel::write); return out.toByteArray(); } catch (IOException e) { diff --git a/src/test/java/com/saasquatch/client5reactive/examples/Example.java b/src/test/java/com/saasquatch/client5reactive/examples/Example.java index 92e5964..7e2937c 100644 --- a/src/test/java/com/saasquatch/client5reactive/examples/Example.java +++ b/src/test/java/com/saasquatch/client5reactive/examples/Example.java @@ -1,13 +1,14 @@ package com.saasquatch.client5reactive.examples; import static java.nio.charset.StandardCharsets.UTF_8; -import java.nio.ByteBuffer; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; + import com.saasquatch.client5reactive.HttpReactiveClient; import com.saasquatch.client5reactive.HttpReactiveClients; import io.reactivex.rxjava3.core.Single; +import java.nio.ByteBuffer; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; public class Example { @@ -22,8 +23,8 @@ public static void main(String[] args) throws Exception { // HttpReactiveClient is just a thin wrapper final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient); // Execute a simple in-memory request - Single - .fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com"))) + Single.fromPublisher( + reactiveClient.execute(SimpleRequestBuilder.get("https://www.example.com").build())) .doOnSuccess(response -> { // Get the response status and body in memory System.out.println(response.getCode()); @@ -33,9 +34,8 @@ public static void main(String[] args) throws Exception { System.out.println("----------"); // Execute a streaming request // In this case, the request is a simple in-memory request without a request body - Single - .fromPublisher( - reactiveClient.streamingExecute(SimpleHttpRequests.get("https://www.example.com"))) + Single.fromPublisher(reactiveClient.streamingExecute( + SimpleRequestBuilder.get("https://www.example.com").build())) .flatMapPublisher(message -> { // Get the status before subscribing to the streaming body System.out.println(message.getHead().getCode());