From bf04e39f69e4fe2af9adf4702ca8a9612222426d Mon Sep 17 00:00:00 2001
From: Ryland Degnan <ryland@netifi.com>
Date: Tue, 8 Jan 2019 07:25:45 -0800
Subject: [PATCH] Added support for listening to notifications

---
 .../postgresql/PostgresqlConnection.java      |  9 +++
 .../io/r2dbc/postgresql/client/Client.java    | 11 ++++
 .../postgresql/client/ReactorNettyClient.java | 14 +++++
 .../client/ReactorNettyClientTest.java        | 56 +++++++++++++++++++
 .../r2dbc/postgresql/client/TestClient.java   | 10 ++++
 5 files changed, 100 insertions(+)

diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
index df5b0a2c..eccf8c2a 100644
--- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
+++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
@@ -21,15 +21,18 @@
 import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
 import io.r2dbc.postgresql.client.TransactionStatus;
 import io.r2dbc.postgresql.codec.Codecs;
+import io.r2dbc.postgresql.message.backend.NotificationResponse;
 import io.r2dbc.postgresql.util.Assert;
 import io.r2dbc.spi.Connection;
 import io.r2dbc.spi.IsolationLevel;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
@@ -165,6 +168,12 @@ public Mono<Void> rollbackTransactionToSavepoint(String name) {
         });
     }
 
+    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
+        Assert.requireNonNull(consumer, "consumer must not be null");
+
+        return this.client.addNotificationListener(consumer);
+    }
+
     @Override
     public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
         Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java
index 75e3fc1c..18c383f6 100644
--- a/src/main/java/io/r2dbc/postgresql/client/Client.java
+++ b/src/main/java/io/r2dbc/postgresql/client/Client.java
@@ -18,13 +18,16 @@
 
 import io.netty.buffer.ByteBufAllocator;
 import io.r2dbc.postgresql.message.backend.BackendMessage;
+import io.r2dbc.postgresql.message.backend.NotificationResponse;
 import io.r2dbc.postgresql.message.backend.ReadyForQuery;
 import io.r2dbc.postgresql.message.frontend.FrontendMessage;
 import org.reactivestreams.Publisher;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.Optional;
+import java.util.function.Consumer;
 
 /**
  * An abstraction that wraps the networking part of exchanging methods.
@@ -75,4 +78,12 @@ public interface Client {
      */
     TransactionStatus getTransactionStatus();
 
+    /**
+     * Add a consumer of notification messages.
+     *
+     * @param consumer the consumer of notification messages
+     * @return a new {@link Disposable} that can be used to cancel the underlying subscription.
+     * @throws IllegalArgumentException if {@code consumer} is {@code null}
+     */
+    Disposable addNotificationListener(Consumer<NotificationResponse> consumer);
 }
diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
index 6030c0bf..dfa9379e 100644
--- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
+++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
@@ -25,6 +25,7 @@
 import io.r2dbc.postgresql.message.backend.ErrorResponse;
 import io.r2dbc.postgresql.message.backend.Field;
 import io.r2dbc.postgresql.message.backend.NoticeResponse;
+import io.r2dbc.postgresql.message.backend.NotificationResponse;
 import io.r2dbc.postgresql.message.backend.ReadyForQuery;
 import io.r2dbc.postgresql.message.frontend.FrontendMessage;
 import io.r2dbc.postgresql.message.frontend.Terminate;
@@ -32,6 +33,7 @@
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -50,6 +52,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -104,6 +107,11 @@ public final class ReactorNettyClient implements Client {
             sink.next(message);
         });
 
+    private final EmitterProcessor<NotificationResponse> notificationProcessor = EmitterProcessor.create(false);
+
+    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleNotificationResponse = handleBackendMessage(NotificationResponse.class,
+        (message, sink) -> this.notificationProcessor.onNext(message));
+
     /**
      * Creates a new frame processor connected to a given TCP connection.
      *
@@ -124,6 +132,7 @@ private ReactorNettyClient(Connection connection) {
             .concatMap(decoder::decode)
             .doOnNext(message -> this.logger.debug("Response: {}", message))
             .handle(this.handleNoticeResponse)
+            .handle(this.handleNotificationResponse)
             .handle(this.handleErrorResponse)
             .handle(this.handleBackendKeyData)
             .handle(this.handleReadyForQuery)
@@ -258,6 +267,11 @@ public TransactionStatus getTransactionStatus() {
         return this.transactionStatus.get();
     }
 
+    @Override
+    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
+        return this.notificationProcessor.subscribe(consumer);
+    }
+
     @SuppressWarnings("unchecked")
     private static <T extends BackendMessage> BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleBackendMessage(Class<T> type, BiConsumer<T, SynchronousSink<BackendMessage>> consumer) {
         return (message, sink) -> {
diff --git a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java
index 8699f1d6..a26f1f0c 100644
--- a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java
+++ b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java
@@ -19,6 +19,7 @@
 import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
 import io.r2dbc.postgresql.message.backend.CommandComplete;
 import io.r2dbc.postgresql.message.backend.DataRow;
+import io.r2dbc.postgresql.message.backend.NotificationResponse;
 import io.r2dbc.postgresql.message.backend.RowDescription;
 import io.r2dbc.postgresql.message.frontend.Query;
 import io.r2dbc.postgresql.util.PostgresqlServerExtension;
@@ -26,7 +27,11 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.Disposable;
+import reactor.core.publisher.FluxProcessor;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.publisher.UnicastProcessor;
 import reactor.test.StepVerifier;
 
 import java.util.Arrays;
@@ -142,4 +147,55 @@ void parallelExchange() {
             .verifyComplete();
     }
 
+    @Test
+    void handleNotify() {
+        UnicastProcessor<NotificationResponse> response = UnicastProcessor.create();
+        this.client.addNotificationListener(response::onNext);
+
+        this.client
+            .exchange(Mono.just(new Query("LISTEN events")))
+            .blockLast();
+
+        SERVER.getJdbcOperations().execute("NOTIFY events, 'test'");
+
+        StepVerifier.create(response)
+            .assertNext(message -> assertThat(message.getPayload()).isEqualTo("test"))
+            .thenCancel()
+            .verify();
+    }
+
+    @Test
+    void handleTrigger() {
+        SERVER.getJdbcOperations().execute(
+            "CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$\n" +
+                "  DECLARE\n" +
+                "    payload JSON;\n" +
+                "  BEGIN\n" +
+                "    payload = row_to_json(NEW);\n" +
+                "    PERFORM pg_notify('events', payload::text);\n" +
+                "    RETURN NULL;\n" +
+                "  END;\n" +
+                "$$ LANGUAGE plpgsql;");
+
+        SERVER.getJdbcOperations().execute(
+            "CREATE TRIGGER notify_test_event\n" +
+                "AFTER INSERT OR UPDATE OR DELETE ON test\n" +
+                "  FOR EACH ROW EXECUTE PROCEDURE notify_event();");
+
+        UnicastProcessor<NotificationResponse> response = UnicastProcessor.create();
+        this.client.addNotificationListener(response::onNext);
+
+        this.client
+            .exchange(Mono.just(new Query("LISTEN events")))
+            .blockLast();
+
+        SERVER.getJdbcOperations().execute("INSERT INTO test VALUES (100)");
+        SERVER.getJdbcOperations().execute("INSERT INTO test VALUES (1000)");
+
+        StepVerifier.create(response)
+            .assertNext(message -> assertThat(message.getPayload()).isEqualTo("{\"value\":100}"))
+            .assertNext(message -> assertThat(message.getPayload()).isEqualTo("{\"value\":1000}"))
+            .thenCancel()
+            .verify();
+    }
 }
diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java
index b78e0304..3188d4c3 100644
--- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java
+++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java
@@ -18,10 +18,12 @@
 
 import io.netty.buffer.ByteBufAllocator;
 import io.r2dbc.postgresql.message.backend.BackendMessage;
+import io.r2dbc.postgresql.message.backend.NotificationResponse;
 import io.r2dbc.postgresql.message.frontend.FrontendMessage;
 import io.r2dbc.postgresql.util.Assert;
 import io.r2dbc.postgresql.util.TestByteBufAllocator;
 import org.reactivestreams.Publisher;
+import reactor.core.Disposable;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -31,6 +33,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
@@ -53,6 +56,8 @@ public final class TestClient implements Client {
 
     private final TransactionStatus transactionStatus;
 
+    private final EmitterProcessor<NotificationResponse> notificationProcessor = EmitterProcessor.create(false);
+
     private TestClient(boolean expectClose, @Nullable Integer processId, @Nullable Integer secretKey, Flux<Window> windows, TransactionStatus transactionStatus) {
         this.expectClose = expectClose;
         this.processId = processId;
@@ -120,6 +125,11 @@ public TransactionStatus getTransactionStatus() {
         return this.transactionStatus;
     }
 
+    @Override
+    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
+        return this.notificationProcessor.subscribe(consumer);
+    }
+
     public static final class Builder {
 
         private final List<Window.Builder<?>> windows = new ArrayList<>();