Skip to content

Commit

Permalink
Merge pull request #5 from saasquatch/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
slisaasquatch authored Jun 9, 2021
2 parents d841c2b + 3e59781 commit b79dfc8
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 86 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ hs_err_pid*
.settings
/target/
bin

.idea
*.iml
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ Thin wrapper around [Apache HttpComponents](https://hc.apache.org/) HttpAsyncCli

```java
import static java.nio.charset.StandardCharsets.UTF_8;
import java.nio.ByteBuffer;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;

import com.saasquatch.client5reactive.HttpReactiveClient;
import com.saasquatch.client5reactive.HttpReactiveClients;
import io.reactivex.rxjava3.core.Single;
import java.nio.ByteBuffer;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;

public class Example {

Expand All @@ -31,8 +32,8 @@ public class Example {
// HttpReactiveClient is just a thin wrapper
final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
// Execute a simple in-memory request
Single
.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get("https://www.example.com")))
Single.fromPublisher(
reactiveClient.execute(SimpleRequestBuilder.get("https://www.example.com").build()))
.doOnSuccess(response -> {
// Get the response status and body in memory
System.out.println(response.getCode());
Expand All @@ -42,9 +43,8 @@ public class Example {
System.out.println("----------");
// Execute a streaming request
// In this case, the request is a simple in-memory request without a request body
Single
.fromPublisher(
reactiveClient.streamingExecute(SimpleHttpRequests.get("https://www.example.com")))
Single.fromPublisher(reactiveClient.streamingExecute(
SimpleRequestBuilder.get("https://www.example.com").build()))
.flatMapPublisher(message -> {
// Get the status before subscribing to the streaming body
System.out.println(message.getHead().getCode());
Expand Down Expand Up @@ -101,14 +101,14 @@ Maven
<dependency>
<groupId>com.github.saasquatch</groupId>
<artifactId>apache-client5-reactive</artifactId>
<version>0.0.3</version>
<version>0.0.4</version>
</dependency>
```

Gradle

```gradle
compile 'com.github.saasquatch:apache-client5-reactive:0.0.3'
implementation 'com.github.saasquatch:apache-client5-reactive:0.0.4'
```

## License
Expand Down
15 changes: 8 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.saasquatch</groupId>
<artifactId>apache-client5-reactive</artifactId>
<version>0.0.3-SNAPSHOT</version>
<version>0.0.4-SNAPSHOT</version>
<packaging>jar</packaging>

<name>apache-client5-reactive</name>
Expand All @@ -19,7 +19,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.7.0</junit.version>
<junit.version>5.7.2</junit.version>
</properties>

<dependencies>
Expand All @@ -38,17 +38,17 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.0.3</version>
<version>5.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-reactive</artifactId>
<version>5.0.2</version>
<version>5.1.1</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.7</version>
<version>3.0.13</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down Expand Up @@ -77,7 +77,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -90,9 +90,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<version>3.3.0</version>
<configuration>
<doclint>none</doclint>
<source>8</source>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,39 @@
/**
* Thin wrapper around Apache {@link HttpAsyncClient} to expose
* <a href="https://www.reactive-streams.org/">Reactive Streams</a> interfaces.<br>
* The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and
* {@link CloseableHttpAsyncClient}.
* The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and {@link
* CloseableHttpAsyncClient}.
*
* @author sli
* @see HttpReactiveClients
*/
public interface HttpReactiveClient {

/**
* Execute the given request. This method is equivalent to
* {@link HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext , FutureCallback)}.
* If the {@link Future} produced by the equivalent {@link HttpAsyncClient} method completes with
* {@code null}, then the returning {@link Publisher} of this method will complete with no
* element.
* Execute the given request. This method is equivalent to {@link HttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, HandlerFactory, HttpContext, FutureCallback)}. If the {@link Future}
* produced by the equivalent {@link HttpAsyncClient} method completes with {@code null}, then the
* returning {@link Publisher} of this method will complete with no element.
*/
<T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context);

/**
* Convenience method for
* {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
* equivalent to
* {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HttpContext, FutureCallback)}
* Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer,
* HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, HttpContext, FutureCallback)}
*/
default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer, @Nullable HttpContext context) {
return execute(requestProducer, responseConsumer, null, context);
}

/**
* Convenience method for
* {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
* equivalent to
* {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, FutureCallback)}.
* Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer,
* HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, FutureCallback)}.
*/
default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer) {
Expand All @@ -67,63 +64,62 @@ default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,

/**
* Execute a simple in-memory request and get a simple in-memory response. This method is
* equivalent to
* {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The
* returning {@link Publisher} completes with exactly 1 element.
* equivalent to {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext,
* FutureCallback)}. The returning {@link Publisher} completes with exactly 1 element.
*/
default Publisher<SimpleHttpResponse> execute(@Nonnull SimpleHttpRequest request,
@Nullable HttpContext context) {
return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), context);
}

/**
* Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to
* {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}.
* Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to {@link
* CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}.
*/
default Publisher<SimpleHttpResponse> execute(@Nonnull SimpleHttpRequest request) {
return execute(request, null);
}

/**
* Execute the given request and get a streaming response body as a {@link Publisher} of
* {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The
* {@link Publisher} within the returning {@link Publisher} may contain 0 to n elements.
* Execute the given request and get a streaming response body as a {@link Publisher} of {@link
* ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The {@link
* Publisher} within the returning {@link Publisher} may contain 0 to n elements.
*/
Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context);

/**
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) {
return streamingExecute(requestProducer, null, context);
}

/**
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer) {
return streamingExecute(requestProducer, null);
}

/**
* Execute a simple in-memory request and get a streaming response. Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
* Execute a simple in-memory request and get a streaming response. Convenience method for {@link
* #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull SimpleHttpRequest request, @Nullable HttpContext context) {
return streamingExecute(SimpleRequestProducer.create(request), context);
}

/**
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull SimpleHttpRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.nio.ByteBuffer;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.async.HttpAsyncClient;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
Expand All @@ -14,6 +15,8 @@
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.Maybe;

import javax.annotation.Nonnull;

/**
* Concrete implementation of {@link HttpReactiveClient}.
*
Expand All @@ -28,11 +31,13 @@ final class HttpReactiveClientImpl implements HttpReactiveClient {
}

@Override
public <T> Publisher<T> execute(AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HttpContext context) {
public <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context) {
Objects.requireNonNull(requestProducer);
Objects.requireNonNull(responseConsumer);
//noinspection CodeBlock2Expr
return Maybe.<T>create(emitter -> {
httpAsyncClient.execute(requestProducer, responseConsumer, pushHandlerFactory, context,
FutureCallbacks.maybeEmitter(emitter));
Expand All @@ -41,8 +46,9 @@ public <T> Publisher<T> execute(AsyncRequestProducer requestProducer,

@Override
public Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
AsyncRequestProducer requestProducer, HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context) {
@Nonnull AsyncRequestProducer requestProducer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context) {
Objects.requireNonNull(requestProducer);
/*
* Semantically this should be a Single instead of a Maybe, but using Single here requires an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ private HttpReactiveClients() {}
/**
* Create a {@link HttpReactiveClient} from a given {@link HttpAsyncClient}. Note that the created
* {@link HttpReactiveClient} is simply a wrapper of the {@link HttpAsyncClient} and does not
* support state management, so you'll need to manage the state of the given
* {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()},
* {@link CloseableHttpAsyncClient#close()}, etc.
* support state management, so you'll need to manage the state of the given {@link
* HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, {@link
* CloseableHttpAsyncClient#close()}, etc.
*/
@Nonnull
public static HttpReactiveClient create(@Nonnull HttpAsyncClient httpAsyncClient) {
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/saasquatch/client5reactive/LifecycleTests.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.saasquatch.client5reactive;

import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
import io.reactivex.rxjava3.core.Flowable;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.junit.jupiter.api.Test;
import io.reactivex.rxjava3.core.Flowable;

public class LifecycleTests {

Expand All @@ -15,8 +15,8 @@ public void testNonStarted() throws Exception {
try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault()) {
// Not started
final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test()
.assertError(IllegalStateException.class);
Flowable.fromPublisher(reactiveClient.execute(SimpleRequestBuilder.get(EXAMPLE_URL).build()))
.test().assertError(IllegalStateException.class);
}
}

Expand All @@ -28,8 +28,8 @@ public void testClosed() throws Exception {
reactiveClient = HttpReactiveClients.create(asyncClient);
}
// Closed
Flowable.fromPublisher(reactiveClient.execute(SimpleHttpRequests.get(EXAMPLE_URL))).test()
.assertError(IllegalStateException.class);
Flowable.fromPublisher(reactiveClient.execute(SimpleRequestBuilder.get(EXAMPLE_URL).build()))
.test().assertError(IllegalStateException.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.reactivex.rxjava3.core.Flowable;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
Expand All @@ -13,7 +15,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.Flowable;

public class NullabilityTests {

Expand All @@ -32,6 +33,7 @@ public static void afterAll() throws Exception {
asyncClient.close();
}

@SuppressWarnings({"ConstantConditions", "RedundantCast"})
@Test
public void testNullability() {
assertThrows(NullPointerException.class,
Expand Down Expand Up @@ -59,7 +61,7 @@ public void testNullability() {
@Test
public void testVoidPublisher() {
final Publisher<Void> voidResultPublisher = reactiveClient.execute(
SimpleRequestProducer.create(SimpleHttpRequests.get("https://example.com")),
SimpleRequestProducer.create(SimpleRequestBuilder.get("https://example.com").build()),
new ReactiveResponseConsumer());
assertDoesNotThrow(() -> Flowable.fromPublisher(voidResultPublisher).blockingSubscribe());
}
Expand Down
Loading

0 comments on commit b79dfc8

Please sign in to comment.