From f9d160a6f7e79955b66ca4ecbd92e07e5f6daf52 Mon Sep 17 00:00:00 2001 From: Abhinav Pandey Date: Wed, 27 Nov 2024 23:30:08 +0530 Subject: [PATCH 1/4] Redis PubSub --- kotlin-libraries-data/pom.xml | 13 ++++++ .../com/baeldung/redispubsub/Message.kt | 3 ++ .../baeldung/redispubsub/MessageListener.kt | 11 +++++ .../redispubsub/RedisPubSubApplication.kt | 27 +++++++++++ .../baeldung/redispubsub/RedisPubSubClient.kt | 46 +++++++++++++++++++ 5 files changed, 100 insertions(+) create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt diff --git a/kotlin-libraries-data/pom.xml b/kotlin-libraries-data/pom.xml index fe41a6e6c..7d4d031e0 100644 --- a/kotlin-libraries-data/pom.xml +++ b/kotlin-libraries-data/pom.xml @@ -95,6 +95,16 @@ ${moshi.version} test + + com.github.codemonstur + embedded-redis + ${embedded-redis.version} + + + io.lettuce + lettuce-core + ${lettuce-core.version} + @@ -161,6 +171,9 @@ 2.9.0 2.35.1 1.15.1 + 1.7.1 + 1.4.3 + 6.5.0.RELEASE \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt new file mode 100644 index 000000000..94d8857c8 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt @@ -0,0 +1,3 @@ +package com.baeldung.redispubsub + +data class Message(val content: String) diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt new file mode 100644 index 000000000..71871fea4 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt @@ -0,0 +1,11 @@ +package com.baeldung.redispubsub + +import io.lettuce.core.pubsub.RedisPubSubAdapter + +class MessageListener : RedisPubSubAdapter() { + + override fun message(channel: String?, message: String?) { + println("Received message: $message from channel: $channel") + } + +} \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt new file mode 100644 index 000000000..1d4a87dd4 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt @@ -0,0 +1,27 @@ +package com.baeldung.redispubsub + +import redis.embedded.RedisServer + +private val redisServer: RedisServer = RedisServer(6379) + +fun main() { + redisServer.start() + + val pubSubClient = RedisPubSubClient() + val channel = "test-channel" + val message = Message("Hello, Redis!") + + pubSubClient.createConnection() + + // Subscribe to the channel + pubSubClient.subscribeToChannel(channel) + + // Publish a message + pubSubClient.publishMessage(channel, message) + + pubSubClient.closeConnection() + + // Stop the embedded Redis server + redisServer.stop() +} + diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt new file mode 100644 index 000000000..228ed72b5 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt @@ -0,0 +1,46 @@ +package com.baeldung.redispubsub + +import com.google.gson.Gson +import io.lettuce.core.RedisClient +import io.lettuce.core.RedisURI +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands + +class RedisPubSubClient { + + + private val client: RedisClient = createRedisClient() + private lateinit var connection: StatefulRedisConnection + + private fun createRedisClient(): RedisClient { + val redisUri = RedisURI.Builder.redis("localhost", 6379) + .build() + return RedisClient.create(redisUri) + } + + fun createConnection() { + connection = client.connect() + } + fun closeConnection() { + connection.close() + client.shutdown() + } + + fun subscribeToChannel(channel: String) { + val pubSubConnection: StatefulRedisPubSubConnection = client.connectPubSub() + pubSubConnection.addListener(MessageListener()) + + val asyncCommands: RedisPubSubAsyncCommands = pubSubConnection.async() + asyncCommands.subscribe(channel) + } + + fun publishMessage(channel: String, message: Message) { + val syncCommands = connection.sync() + val messageJson = Gson().toJson(message) + syncCommands.publish(channel, messageJson) + println("Message published: $messageJson") + } + + +} \ No newline at end of file From 51c0233f34246f6f8da3e52bc5c3e4a117f93d8a Mon Sep 17 00:00:00 2001 From: Abhinav Pandey Date: Wed, 27 Nov 2024 23:39:28 +0530 Subject: [PATCH 2/4] Redis PubSub --- .../kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt index 1d4a87dd4..6ccbb576c 100644 --- a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt @@ -13,15 +13,12 @@ fun main() { pubSubClient.createConnection() - // Subscribe to the channel pubSubClient.subscribeToChannel(channel) - // Publish a message pubSubClient.publishMessage(channel, message) pubSubClient.closeConnection() - // Stop the embedded Redis server redisServer.stop() } From e2432bb7463a73ca8ceefe5ab30871aa445836f8 Mon Sep 17 00:00:00 2001 From: Abhinav Pandey Date: Fri, 6 Dec 2024 18:39:29 +0530 Subject: [PATCH 3/4] Redis PubSub - review comments incorporated --- .../baeldung/redispubsub/MessageListener.kt | 6 +++ .../redispubsub/RedisConnectionManager.kt | 14 ++++++ .../redispubsub/RedisPubSubApplication.kt | 24 ---------- .../baeldung/redispubsub/RedisPubSubClient.kt | 46 ------------------- .../baeldung/redispubsub/RedisPublisher.kt | 11 +++++ .../baeldung/redispubsub/RedisSubscriber.kt | 15 ++++++ .../redispubsub/RedisSubscriberUnitTest.kt | 40 ++++++++++++++++ 7 files changed, 86 insertions(+), 70 deletions(-) create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt delete mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt delete mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt create mode 100644 kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt create mode 100644 kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt index 71871fea4..ecd730d18 100644 --- a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt @@ -1,11 +1,17 @@ package com.baeldung.redispubsub import io.lettuce.core.pubsub.RedisPubSubAdapter +import java.util.concurrent.CountDownLatch class MessageListener : RedisPubSubAdapter() { + var latch: CountDownLatch = CountDownLatch(1) + + var messagesReceived: List = emptyList() override fun message(channel: String?, message: String?) { println("Received message: $message from channel: $channel") + messagesReceived = messagesReceived.plus(message!!) + latch.countDown() } } \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt new file mode 100644 index 000000000..5cd8940d2 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt @@ -0,0 +1,14 @@ +package com.baeldung.redispubsub + +import io.lettuce.core.RedisClient +import io.lettuce.core.api.StatefulRedisConnection + +object RedisConnectionManager { + val redisClient: RedisClient = RedisClient.create("redis://localhost:6379") + val connection: StatefulRedisConnection = redisClient.connect() + + fun close() { + connection.close() + redisClient.shutdown() + } +} diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt deleted file mode 100644 index 6ccbb576c..000000000 --- a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubApplication.kt +++ /dev/null @@ -1,24 +0,0 @@ -package com.baeldung.redispubsub - -import redis.embedded.RedisServer - -private val redisServer: RedisServer = RedisServer(6379) - -fun main() { - redisServer.start() - - val pubSubClient = RedisPubSubClient() - val channel = "test-channel" - val message = Message("Hello, Redis!") - - pubSubClient.createConnection() - - pubSubClient.subscribeToChannel(channel) - - pubSubClient.publishMessage(channel, message) - - pubSubClient.closeConnection() - - redisServer.stop() -} - diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt deleted file mode 100644 index 228ed72b5..000000000 --- a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPubSubClient.kt +++ /dev/null @@ -1,46 +0,0 @@ -package com.baeldung.redispubsub - -import com.google.gson.Gson -import io.lettuce.core.RedisClient -import io.lettuce.core.RedisURI -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.pubsub.StatefulRedisPubSubConnection -import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands - -class RedisPubSubClient { - - - private val client: RedisClient = createRedisClient() - private lateinit var connection: StatefulRedisConnection - - private fun createRedisClient(): RedisClient { - val redisUri = RedisURI.Builder.redis("localhost", 6379) - .build() - return RedisClient.create(redisUri) - } - - fun createConnection() { - connection = client.connect() - } - fun closeConnection() { - connection.close() - client.shutdown() - } - - fun subscribeToChannel(channel: String) { - val pubSubConnection: StatefulRedisPubSubConnection = client.connectPubSub() - pubSubConnection.addListener(MessageListener()) - - val asyncCommands: RedisPubSubAsyncCommands = pubSubConnection.async() - asyncCommands.subscribe(channel) - } - - fun publishMessage(channel: String, message: Message) { - val syncCommands = connection.sync() - val messageJson = Gson().toJson(message) - syncCommands.publish(channel, messageJson) - println("Message published: $messageJson") - } - - -} \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt new file mode 100644 index 000000000..a12109a14 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt @@ -0,0 +1,11 @@ +package com.baeldung.redispubsub + +class RedisPublisher { + + fun publishMessage(channel: String, message: Message) { + val connection = RedisConnectionManager.connection + val syncCommands = connection.sync() + syncCommands.publish(channel, message.content) + println("Message published: $message") + } +} \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt new file mode 100644 index 000000000..045a876f6 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt @@ -0,0 +1,15 @@ +package com.baeldung.redispubsub + +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands + +class RedisSubscriber(private val messageListener: MessageListener) { + + fun subscribeToChannel(channel: String) { + val pubSubConnection: StatefulRedisPubSubConnection = RedisConnectionManager.redisClient.connectPubSub() + pubSubConnection.addListener(messageListener) + val asyncCommands: RedisPubSubAsyncCommands = pubSubConnection.async() + + asyncCommands.subscribe(channel) + } +} diff --git a/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt new file mode 100644 index 000000000..80563b26f --- /dev/null +++ b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt @@ -0,0 +1,40 @@ +package com.baeldung.redispubsub + +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import redis.embedded.RedisServer +import java.util.concurrent.TimeUnit + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class RedisSubscriberUnitTest { + + val messageListener = MessageListener() + val redisSubscriber = RedisSubscriber(messageListener) + val redisPublisher = RedisPublisher() + val channel = "channel" + val message = Message("Hello, Redis!") + + val redisServer = RedisServer(6379) + + @BeforeAll + fun setUp() { + redisServer.start() + } + + @AfterAll + fun tearDown() { + redisServer.stop() + } + + @Test + fun givenMessageListener_whenMessagePublished_thenMessageReceived() { + redisSubscriber.subscribeToChannel(channel) + redisPublisher.publishMessage(channel, message) + messageListener.latch.await(500, TimeUnit.MILLISECONDS) + assertEquals(message.content, messageListener.messagesReceived.get(0)) + } + +} \ No newline at end of file From 3df5d64c660fe5b69401761cf3530426d524b70a Mon Sep 17 00:00:00 2001 From: Abhinav Pandey Date: Fri, 6 Dec 2024 19:40:23 +0530 Subject: [PATCH 4/4] Redis PubSub - review comments incorporated --- .../kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt | 4 ++-- .../com/baeldung/redispubsub/RedisSubscriberUnitTest.kt | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt index 5cd8940d2..33cbe2d56 100644 --- a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt @@ -3,11 +3,11 @@ package com.baeldung.redispubsub import io.lettuce.core.RedisClient import io.lettuce.core.api.StatefulRedisConnection -object RedisConnectionManager { +object RedisConnectionManager: AutoCloseable { val redisClient: RedisClient = RedisClient.create("redis://localhost:6379") val connection: StatefulRedisConnection = redisClient.connect() - fun close() { + override fun close() { connection.close() redisClient.shutdown() } diff --git a/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt index 80563b26f..6d3715def 100644 --- a/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt +++ b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt @@ -26,6 +26,7 @@ class RedisSubscriberUnitTest { @AfterAll fun tearDown() { + RedisConnectionManager.close() redisServer.stop() }