Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Serverless issue - psubscribe error - how can I disable in Spring #3297

Open
dreamstar-enterprises opened this issue Dec 16, 2024 · 0 comments
Labels
status: waiting-for-triage An issue we've not yet triaged type: bug A general bug

Comments

@dreamstar-enterprises
Copy link

dreamstar-enterprises commented Dec 16, 2024

Apparently, the Spring Redis Data team does recognise this as an issue, but says it is something the Spring Session team should look into.

See here, for a full explanation, with their responses:

spring-projects/spring-data-redis#3075

Can a solution be found?

Original issue below:

Hi,

I'm using AWS Elasticache Serverless, and in the logs keep seeing the below errors.

I thought I had turned pub/sub events off in my app, but keep getting 2 errors and cannot see where I have gone wrong.

Can someone help?

Here are the 2 errors, and then my Spring Connection Factory Code:

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'psubscribe', with args beginning with: spring:session:event:0:created:*

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

ERROR 1

14 December 2024 at 18:43 (UTC)
	
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'psubscribe', with args beginning with: spring:session:event:0:created:*
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:147) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.internal.ExceptionFactory.createExecutionException(ExceptionFactory.java:116) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:761) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:745) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.pubsub.PubSubCommandHandler.complete(PubSubCommandHandler.java:167) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:680) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:597) ~[lettuce-core-6.3.2.RELEASE.jar!/:6.3.2.RELEASE/8941aea]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1336) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385) ~[netty-handler-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) ~[netty-transport-classes-epoll-4.1.112.Final.jar!/:4.1.112.Final]
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff
14 December 2024 at 18:43 (UTC)
	
... 4 common frames omitted
	
[928293abs2393322183306ss1axY2](https://eu-west-2.console.aws.amazon.com/ecs/v2/clusters/MyAppCluster/services/MyAppService/tasks/928293abs2393322183306ss1axY2?region=eu-west-2)
	
bff

ERROR 2

2024-12-14T18:49:56.253Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@692dc72c was either previously returned or does not belong to this connection provider

at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

....

at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]

at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

CONNECTION FACTORY

**
 * Establishes a Redis Connection Factory with comprehensive configuration options.
 *
 * Provides configuration for:
 * - Connection pooling and lifecycle management
 * - Cluster and standalone deployment modes
 * - SSL/TLS security for production environments
 * - DNS resolution and caching
 * - Performance tuning (thread pools, buffers, queues)
 * - High availability features (topology refresh, failover)
 *
 * The factory supports different deployment profiles:
 * - Production: Clustered Redis with SSL
 * - Development: Standalone Redis without SSL
 *
 * @property profileProperties Configuration properties for active deployment profile
 * @since 1.0.0
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val profileProperties: ProfileProperties
) {

    /**
     * Client resources for Redis connections.
     *
     * Manages shared resources including:
     * - Thread pools for I/O and computation
     * - DNS resolution and caching
     * - Command latency metrics
     * - Connection lifecycle
     *
     * This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
     * Using lateinit as the resources are created after Spring context initialization.
     */
    private lateinit var clientResources: ClientResources

    companion object {

        private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)

        /**
         * Timeout configurations for Redis operations.
         * - Command timeout: Maximum time for command execution
         * - Connect timeout: Maximum time for connection establishment
         * - Topology refresh: Interval for cluster topology updates
         * - Adaptive refresh: Time window for topology change detection
         */
        private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
        private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
        private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
        private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
        private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
        private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L

        /**
         * Connection pool settings to optimize resource usage.
         * - Max total: Maximum number of connections in the pool
         * - Max/Min idle: Upper/Lower bounds for idle connections
         * - Max wait: Maximum time to wait for connection
         * - Eviction: Maintenance settings for idle connections
         */
        private const val MAX_TOTAL_CONNECTIONS = 100
        private const val MAX_IDLE_CONNECTIONS = 60
        private const val MIN_IDLE_CONNECTIONS = 20
        private const val MAX_WAIT_SECONDS = 120L
        private const val EVICTION_RUN_PERIOD_SECONDS = 120L
        private const val MIN_EVICTABLE_IDLE_MINUTES = 5L

        /**
         * Performance optimization parameters.
         * - Decode buffer ratio: Memory allocation for response buffers
         * - Request queue size: Maximum pending requests
         * - Latency publish interval: Metrics publication frequency
         * - Thread pool multiplier: Scaling factor for I/O threads
         */
        private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
        private const val REQUEST_QUEUE_SIZE = 2500
        private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
        private const val IO_THREAD_POOL_MULTIPLIER = 2
    }

    /**
     * Configures Redis keyspace notifications behavior.
     *
     * Returns NO_OP action to disable automatic configuration of keyspace
     * notifications, preventing potential connection issues during shutdown
     * related to Pub/Sub connections in Spring Session.
     *
     * @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
     */
    @Bean
    fun configureRedisAction(): ConfigureRedisAction {
        return ConfigureRedisAction.NO_OP
    }

    /* LETTUCE - reactive RedisConnectionFactory */
    /**
     * Creates the primary reactive Redis connection factory.
     *
     * Configures a Lettuce-based connection factory with:
     * - Profile-specific Redis deployment mode (clustered/standalone)
     * - Connection pooling
     * - Client resources (thread pools, DNS resolution)
     * - SSL for production environments
     *
     * @param clusterProperties Cluster node configuration
     * @param springDataRedisProperties Redis connection properties
     * @return Configured [ReactiveRedisConnectionFactory]
     */
    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
        springDataRedisProperties: SpringDataRedisProperties
    ): ReactiveRedisConnectionFactory {
        val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
        clientResources = createClientResources(springDataRedisProperties.host)
        val clientConfig = createLettuceClientConfig(
            clientResources,
            profileProperties.active ?: ProfileTypes.DEVELOPMENT.type
        )

        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)

            // Pre-warm the connection pool
            repeat(MIN_IDLE_CONNECTIONS) {
                try {
                    this.connection.close()
                } catch (e: Exception) {
                    // Ignore pre-warm exceptions
                }
            }
        }
    }

    /**
     * Performs graceful cleanup of Redis client resources on shutdown.
     *
     * Features:
     * - Allows in-flight operations to complete
     * - Controlled shutdown with timeouts
     * - Resource cleanup verification
     * - Error handling
     *
     * Timeouts:
     * - Quiet period: 1 seconds for normal completion
     * - Force shutdown: 2 additional seconds if needed
     */
    @PreDestroy
    fun cleanup() {
        try {

            logger.debug("Starting Redis client resources cleanup")

            // Brief pause before cleanup to allow in-flight operations to complete
            Thread.sleep(100)

            // Shutdown client resources with a grace period
            clientResources.shutdown(
                SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS,
                SHUTDOWN_TIMEOUT_SECONDS,
                TimeUnit.SECONDS
            )

            logger.debug("Redis client resources cleanup completed successfully")
        } catch (ex: Exception) {
            logger.debug("Failed to cleanup Redis client resources", ex)
        }
    }

    /**
     * Properties class for Redis cluster configuration.
     *
     * @property nodes List of Redis nodes in format `host:port`
     */
    @Component
    internal class ClusterConfigurationProperties(
        springDataRedisProperties: SpringDataRedisProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataRedisProperties.host}:${springDataRedisProperties.port}",
        )
    }

    /**
     * Creates appropriate Redis configuration based on active profile.
     *
     * @param properties Redis connection properties
     * @param clusterProperties Cluster node configuration
     * @return [RedisConfiguration] for either clustered or standalone deployment
     */
    private fun createRedisConfiguration(
        properties: SpringDataRedisProperties,
        clusterProperties: ClusterConfigurationProperties
    ): RedisConfiguration = when {
        profileProperties.active == ProfileTypes.PRODUCTION.type &&
                properties.type == RedisConfigTypes.CLUSTERED.type -> {

            // Redis Cluster for production
            RedisClusterConfiguration(clusterProperties.nodes).apply {
                password = RedisPassword.of(properties.password)
            }
        }
        else -> {

            // Redis Standalone for non-production
            RedisStandaloneConfiguration().apply {
                hostName = properties.host
                port = properties.port
                password = RedisPassword.of(properties.password)
            }
        }
    }

    /**
     * Creates client resources with optimized thread pools and DNS resolution.
     *
     * @param host Redis host for DNS resolution
     * @return Configured [ClientResources]
     */
    private fun createClientResources(host: String) = DefaultClientResources.builder()
        .ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
        .computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
        .socketAddressResolver(createCachingDnsResolver(host))
        .commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
        .commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
        .build()

    /**
     * Creates Lettuce client configuration with pooling and security settings.
     *
     * Configures:
     * - Read preferences (replica preferred)
     * - Command timeouts
     * - Connection pooling
     * - SSL (for production)
     *
     * @param clientResources Configured client resources for connection management
     * @param activeProfile Current deployment profile
     * @return Configured [LettucePoolingClientConfiguration]
     */
    private fun createLettuceClientConfig(
        clientResources: ClientResources,
        activeProfile: String
    ): LettucePoolingClientConfiguration {
        val clusterClientOptions = createClusterClientOptions(activeProfile)

        return LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            .commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())
            .shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
            .shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
            // conditionally use sslOptions if profileProperties.active is 'prod'
            .apply {
                if (activeProfile == ProfileTypes.PRODUCTION.type) {
                    useSsl()
                }
            }
            .build()
    }

    /**
     * Creates cluster client options with comprehensive connection settings.
     *
     * Configures:
     * - Auto-reconnect behavior
     * - Connection validation
     * - Timeout settings
     * - Socket options
     * - Topology refresh
     * - Buffer and queue sizes
     * - SSL (for production)
     *
     * @param activeProfile Current deployment profile
     * @return Configured [ClusterClientOptions]
     */
    private fun createClusterClientOptions(activeProfile: String): ClusterClientOptions {
        val builder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(createTimeoutOptions())
            .socketOptions(createSocketOptions())
            .topologyRefreshOptions(createTopologyRefreshOptions())
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
            .requestQueueSize(REQUEST_QUEUE_SIZE)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
            .publishOnScheduler(true)
            .protocolVersion(ProtocolVersion.RESP3)

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (activeProfile == ProfileTypes.PRODUCTION.type) {
            builder.sslOptions(createSslOptions())
        }

        return builder.build()
    }

    /**
     * Creates socket options for Redis connections.
     *
     * Configures:
     * - Keep-alive settings
     * - TCP no-delay
     * - Connection timeouts
     *
     * @return Configured [SocketOptions]
     */
    private fun createSocketOptions() = SocketOptions.builder()
        .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
        .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
        .connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
        .build()

    /**
     * Creates timeout options for Redis commands.
     *
     * Configures:
     * - Fixed timeout duration
     * - Command timeout behavior
     *
     * @return Configured [TimeoutOptions]
     */
    private fun createTimeoutOptions() = TimeoutOptions.builder()
        .fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
        .timeoutCommands(true)
        .build()

    /**
     * Creates topology refresh options for Redis cluster.
     *
     * Configures:
     * - Periodic refresh intervals
     * - Dynamic refresh sources
     * - Stale connection handling
     * - Adaptive refresh triggers
     *
     * @return Configured [ClusterTopologyRefreshOptions]
     */
    private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
        .enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
        .dynamicRefreshSources(true)
        .closeStaleConnections(true)
        .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
        .enableAllAdaptiveRefreshTriggers()
        .build()

    /**
     * Creates connection pool configuration.
     *
     * Configures:
     * - Maximum total/idle connections
     * - Connection wait times
     * - Eviction policies
     * - Connection testing
     * - Pool behavior (LIFO/FIFO)
     *
     * @return Configured [GenericObjectPoolConfig]
     */
    private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
        maxTotal = MAX_TOTAL_CONNECTIONS
        maxIdle = MAX_IDLE_CONNECTIONS
        minIdle = MIN_IDLE_CONNECTIONS
        setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
        timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
        minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
        testOnBorrow = true
        testWhileIdle = true
        testOnReturn = true
        blockWhenExhausted = true
        lifo = true
        jmxEnabled = false
        fairness = true
        evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
        numTestsPerEvictionRun = 3
    }

    /**
     * Creates SSL options for secure Redis connections.
     *
     * Configures JDK-based SSL provider for Redis connections
     * in production environments.
     *
     * @return Configured [SslOptions]
     */
    private fun createSslOptions(): SslOptions = SslOptions.builder()
        .jdkSslProvider()
        .build()

    /**
     * Creates DNS resolver with caching capabilities.
     *
     * Implements:
     * - DNS resolution caching
     * - Hostname-to-IP mapping
     * - Fallback handling for resolution failures
     *
     * @param host Redis host to resolve
     * @return Configured [MappingSocketAddressResolver]
     */
    private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
        val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val addresses = dnsCache.computeIfAbsent(host) {
                try {
                    DnsResolvers.JVM_DEFAULT.resolve(host)
                } catch (e: UnknownHostException) {
                    emptyArray()
                }
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress
            if (hostAndPort.hostText == cacheIP) {
                HostAndPort.of(host, hostAndPort.port)
            } else {
                hostAndPort
            }
        }

        return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
    }

}
@dreamstar-enterprises dreamstar-enterprises added status: waiting-for-triage An issue we've not yet triaged type: bug A general bug labels Dec 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged type: bug A general bug
Projects
None yet
Development

No branches or pull requests

1 participant