From 660a0a7d6aada9f4d973cc46d84569eb8ebccebe Mon Sep 17 00:00:00 2001 From: Fran Garcia Date: Tue, 20 Apr 2021 13:50:27 +0100 Subject: [PATCH] Support reading from replicas on pipeline executions --- rediscluster/client.py | 90 ++++++++++++++++++++-------------------- rediscluster/pipeline.py | 4 +- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/rediscluster/client.py b/rediscluster/client.py index 6f9dd986..58e6d1a6 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -53,6 +53,50 @@ TimeoutError, ) +# Not complete, but covers the major ones +# https://redis.io/commands +READ_COMMANDS = frozenset([ + "BITCOUNT", + "BITPOS", + "EXISTS", + "GEODIST", + "GEOHASH", + "GEOPOS", + "GEORADIUS", + "GEORADIUSBYMEMBER", + "GET", + "GETBIT", + "GETRANGE", + "HEXISTS", + "HGET", + "HGETALL", + "HKEYS", + "HLEN", + "HMGET", + "HSTRLEN", + "HVALS", + "KEYS", + "LINDEX", + "LLEN", + "LRANGE", + "MGET", + "PTTL", + "RANDOMKEY", + "SCARD", + "SDIFF", + "SINTER", + "SISMEMBER", + "SMEMBERS", + "SRANDMEMBER", + "STRLEN", + "SUNION", + "TTL", + "ZCARD", + "ZCOUNT", + "ZRANGE", + "ZSCORE", +]) + log = logging.getLogger(__name__) @@ -168,50 +212,6 @@ class RedisCluster(Redis): ], 'slot-id'), ) - # Not complete, but covers the major ones - # https://redis.io/commands - READ_COMMANDS = [ - "BITCOUNT", - "BITPOS", - "EXISTS", - "GEODIST", - "GEOHASH", - "GEOPOS", - "GEORADIUS", - "GEORADIUSBYMEMBER", - "GET", - "GETBIT", - "GETRANGE", - "HEXISTS", - "HGET", - "HGETALL", - "HKEYS", - "HLEN", - "HMGET", - "HSTRLEN", - "HVALS", - "KEYS", - "LINDEX", - "LLEN", - "LRANGE", - "MGET", - "PTTL", - "RANDOMKEY", - "SCARD", - "SDIFF", - "SINTER", - "SISMEMBER", - "SMEMBERS", - "SRANDMEMBER", - "STRLEN", - "SUNION", - "TTL", - "ZCARD", - "ZCOUNT", - "ZRANGE", - "ZSCORE", - ] - RESULT_CALLBACKS = dict_merge( string_keys_to_dict([ "BGREWRITEAOF", @@ -608,7 +608,7 @@ def _execute_command(self, *args, **kwargs): else: node = self.connection_pool.get_node_by_slot( slot, - self.read_from_replicas and (command in self.READ_COMMANDS) + self.read_from_replicas and (command in READ_COMMANDS) ) is_read_replica = node['server_type'] == 'slave' diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 3671737a..3eae7a39 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -4,7 +4,7 @@ import sys # rediscluster imports -from .client import RedisCluster +from .client import RedisCluster, READ_COMMANDS from .exceptions import ( RedisClusterException, AskError, MovedError, TryAgainError, ClusterDownError, ) @@ -194,7 +194,7 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= # refer to our internal node -> slot table that tells us where a given # command should route to. slot = self._determine_slot(*c.args) - node = self.connection_pool.get_node_by_slot(slot) + node = self.connection_pool.get_node_by_slot(slot, self.read_from_replicas and c.args[0] in READ_COMMANDS) # little hack to make sure the node name is populated. probably could clean this up. self.connection_pool.nodes.set_node_name(node)