Skip to content

Commit

Permalink
additional pubsub commands (#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
yisraelU authored Nov 24, 2024
1 parent 87cd3cb commit 3a750ec
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.pubsub.data.Subscription

trait PubSubStats[F[_], K] {
def pubSubChannels: F[List[K]]
def numPat: F[Long]
def numSub: F[List[Subscription[K]]]
def pubSubChannels: F[List[RedisChannel[K]]]
def pubSubShardChannels: F[List[RedisChannel[K]]]
def pubSubSubscriptions(channel: RedisChannel[K]): F[Subscription[K]]
def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
}

trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,25 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
Stream.eval(FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
}

override def pubSubChannels: Stream[F, List[K]] =
override def numPat: Stream[F, Long] =
pubSubStats.numPat

override def numSub: Stream[F, List[Subscription[K]]] =
pubSubStats.numSub

override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
pubSubStats.pubSubChannels

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
pubSubStats.pubSubShardChannels

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
pubSubStats.pubSubSubscriptions(channel)

override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channels)

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
pubSubStats.shardNumSub(channels)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,42 @@ import dev.profunktor.redis4cats.effect.FutureLift
import dev.profunktor.redis4cats.pubsub.data.Subscription
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

import dev.profunktor.redis4cats.JavaConversions._
import dev.profunktor.redis4cats.pubsub.internals.LivePubSubStats.toSubscription

import java.{ util => ju }
import java.lang.{ Long => JLong }
private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V](
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PubSubStats[Stream[F, *], K] {

override def pubSubChannels: Stream[F, List[K]] =
override def numPat: Stream[F, Long] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubNumpat())
}
.map(Long.unbox)

override def numSub: Stream[F, List[Subscription[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubNumsub())
}
.map(toSubscription[K])

override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubChannels())
}
.map(_.asScala.toList)
.map(_.asScala.toList.map(RedisChannel[K]))

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubShardChannels())
}
.map(_.asScala.toList.map(RedisChannel[K]))

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
pubSubSubscriptions(List(channel)).map(_.headOption).unNone
Expand All @@ -46,7 +69,18 @@ private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V](
Stream.eval {
FutureLift[F]
.lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))
.map(_.asScala.toList.map { case (k, n) => Subscription(RedisChannel[K](k), n) })
.map(toSubscription[K])
}

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
}
.map(toSubscription[K])

}
object LivePubSubStats {
private def toSubscription[K](map: ju.Map[K, JLong]): List[Subscription[K]] =
map.asScala.toList.map { case (k, n) => Subscription(RedisChannel[K](k), Long.unbox(n)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V](
override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.evalMap(message => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)

override def pubSubChannels: Stream[F, List[K]] =
override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
pubSubStats.pubSubChannels

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
Expand All @@ -44,4 +44,15 @@ private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V](
override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channels)

override def numPat: Stream[F, Long] =
pubSubStats.numPat

override def numSub: Stream[F, List[Subscription[K]]] =
pubSubStats.numSub

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
pubSubStats.pubSubShardChannels

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
pubSubStats.shardNumSub(channels)
}

0 comments on commit 3a750ec

Please sign in to comment.