From c97ec4cefa7af648d141264ca07b168c252ae5b5 Mon Sep 17 00:00:00 2001 From: bbaldino Date: Tue, 2 May 2023 19:34:19 -0700 Subject: [PATCH 1/2] add pool for pubsub connections --- redis/src/lib.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/redis/src/lib.rs b/redis/src/lib.rs index b4510d2..a067910 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -39,9 +39,50 @@ pub use bb8; pub use redis; use async_trait::async_trait; +use redis::aio::PubSub; use redis::{aio::Connection, ErrorKind}; use redis::{Client, IntoConnectionInfo, RedisError}; +/// A `bb8::ManageConnection` for `redis::aio::PubSub` +#[derive(Clone, Debug)] +pub struct RedisPubSubConnectionManager { + client: Client, +} + +impl RedisPubSubConnectionManager { + /// Create a new `RedisConnectionPubSubManager`. + /// See `redis::Client::open` for a description of the parameter types. + pub fn new(info: T) -> Result { + Ok(Self { + client: Client::open(info.into_connection_info()?)?, + }) + } +} + +#[async_trait] +impl bb8::ManageConnection for RedisPubSubConnectionManager { + type Connection = PubSub; + type Error = RedisError; + + async fn connect(&self) -> Result { + Ok(self.client.get_async_connection().await?.into_pubsub()) + } + + async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> { + // TODO: + Ok(()) + // let pong: String = redis::cmd("PING").query_async(conn).await?; + // match pong.as_str() { + // "PONG" => Ok(()), + // _ => Err((ErrorKind::ResponseError, "ping request").into()), + // } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + /// A `bb8::ManageConnection` for `redis::Client::get_async_connection`. #[derive(Clone, Debug)] pub struct RedisConnectionManager { From 1846e2d6136e99cdcb82893c7c245e1cc303afe1 Mon Sep 17 00:00:00 2001 From: bbaldino Date: Wed, 17 May 2023 09:29:23 -0700 Subject: [PATCH 2/2] use unsubscribe as a way to check pubsub pool health --- redis/src/lib.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/redis/src/lib.rs b/redis/src/lib.rs index a067910..7872bb7 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -68,14 +68,8 @@ impl bb8::ManageConnection for RedisPubSubConnectionManager { Ok(self.client.get_async_connection().await?.into_pubsub()) } - async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> { - // TODO: - Ok(()) - // let pong: String = redis::cmd("PING").query_async(conn).await?; - // match pong.as_str() { - // "PONG" => Ok(()), - // _ => Err((ErrorKind::ResponseError, "ping request").into()), - // } + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + conn.punsubscribe("").await } fn has_broken(&self, _: &mut Self::Connection) -> bool {