Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rediscluster support #573

Merged
merged 28 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7492d69
Add redisc cluster client support
Mar 8, 2021
e9b646a
Add integration tests for ClusterRedisClient
Mar 8, 2021
6050bc3
remove unused import
Mar 8, 2021
8d5d40b
remove unnneeded comment
Mar 8, 2021
cb95feb
remove unused import
Mar 8, 2021
9655d17
disable pylint warning
Mar 8, 2021
6214d10
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
78dc569
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
1765def
address comments
Mar 10, 2021
ffdecf9
set version
Mar 10, 2021
9570abf
Update baseplate/clients/redis_cluster.py
FranGM Mar 10, 2021
df35f03
address comments (take 2)
Mar 11, 2021
94914ae
When read_from_replicas is enabled also read from master
Mar 11, 2021
191ed5d
Return only one node on get_node_by_slot
FranGM Mar 30, 2021
c78dcf2
fix documentation lint issues
FranGM Mar 30, 2021
8db95aa
change some client defaults
FranGM Mar 30, 2021
024adff
add more tests
FranGM Mar 30, 2021
b904a82
Remove unused metric
Apr 6, 2021
aeb416d
pass read_from_replicas arg when creating pipeline
FranGM Apr 20, 2021
2c099df
bump redis-py-cluster version
FranGM Apr 20, 2021
1ee4bdc
Add hot key tracker to the cluster reddis client
Apr 29, 2021
35839ad
Merge branch 'rediscluster-support' of github.com:FranGM/baseplate.py…
Apr 29, 2021
4255cf2
Allow to configure max_connections_per_node
FranGM May 2, 2021
972f82d
Update to current redis-py-cluster version
FranGM May 18, 2021
96054f4
Update requirements-transitive.txt
FranGM May 18, 2021
c35f976
Add hot key tracking to docs
May 19, 2021
44ecda5
Update setup.py
FranGM May 19, 2021
99b4ffd
Update docs/api/baseplate/clients/redis_cluster.rst
spladug May 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions baseplate/clients/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

import redis
import rediscluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make anyone using the current redis client suddenly have to install redis-py-cluster as well just to continue using the other client. Can you move the new cluster stuff into its own module in the clients folder? Like baseplate/clients/redis_cluster.py perhaps?


# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
try:
Expand Down Expand Up @@ -253,3 +254,170 @@ def close(self) -> None:
and dequeue as the actions will recreate the queue)
"""
self.client.delete(self.queue)


def cluster_pool_from_config(
app_config: config.RawConfig, prefix: str = "rediscluster.", **kwargs: Any
) -> rediscluster.ClusterConnectionPool:
"""Make a ClusterConnectionPool from a configuration dictionary.

The keys useful to :py:func:`clusterpool_from_config` should be prefixed, e.g.
``rediscluster.url``, ``rediscluster.max_connections``, etc. The ``prefix`` argument
specifies the prefix used to filter keys. Each key is mapped to a
corresponding keyword argument on the :py:class:`redis.ConnectionPool`
constructor.

Supported keys:

* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""

assert prefix.endswith(".")

parser = config.SpecParser(
{
"startup_nodes": config.TupleOf(config.Endpoint),
"max_connections": config.Optional(config.Integer, default=None),
"timeout": config.Optional(config.Timespan, default=None),
}
)

options = parser.parse(prefix[:-1], app_config)

nodes = [{"host": endpoint.address.host, "port": endpoint.address.port} for endpoint in options.startup_nodes]

kwargs["startup_nodes"] = nodes

if options.max_connections is not None:
kwargs.setdefault("max_connections", options.max_connections)
if options.timeout is not None:
kwargs.setdefault("timeout", options.timeout.total_seconds())

return rediscluster.ClusterBlockingConnectionPool.from_url(options.url, **kwargs)


class ClusterRedisClient(config.Parser):
"""Configure a clustered Redis client.

This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.

See :py:func:`cluster_pool_from_config` for available configuration settings.

"""

def __init__(self, **kwargs: Any):
self.kwargs = kwargs

def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory":
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return ClusterRedisContextFactory(connection_pool)


class ClusterRedisContextFactory(ContextFactory):
"""Cluster Redis client context factory.

This factory will attach a
:py:class:`~baseplate.clients.redis.MonitoredClusterRedisConnection` to an
attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands
are executed via this connection object, they will use connections from the
provided :py:class:`rediscluster.ClusterConnectionPool` and automatically record
diagnostic information.

:param connection_pool: A connection pool.
"""

def __init__(self, connection_pool: rediscluster.ClusterConnectionPool):
self.connection_pool = connection_pool

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool):
return

size = self.connection_pool.max_connections
open_connections = len(self.connection_pool._connections) # type: ignore
available = self.connection_pool.pool.qsize()
in_use = size - available

batch.gauge("pool.size").replace(size)
batch.gauge("pool.in_use").replace(in_use)
batch.gauge("pool.open_and_available").replace(open_connections - in_use)

def make_object_for_context(self, name: str, span: Span) -> "MonitoredClusterRedisConnection":
return MonitoredClusterRedisConnection(name, span, self.connection_pool)


class MonitoredClusterRedisConnection(rediscluster.RedisCluster):
"""Cluster Redis connection that collects diagnostic information.

This connection acts like :py:class:`rediscluster.Redis` except that all
operations are automatically wrapped with diagnostic collection.

The interface is the same as that class except for the
:py:meth:`~baseplate.clients.redis.MonitoredClusterRedisConnection.pipeline`
method.

"""

def __init__(
self,
context_name: str,
server_span: Span,
connection_pool: rediscluster.ClusterConnectionPool,
):
self.context_name = context_name
self.server_span = server_span

super().__init__(connection_pool=connection_pool)

def execute_command(self, *args: Any, **kwargs: Any) -> Any:
command = args[0]
trace_name = f"{self.context_name}.{command}"

with self.server_span.make_child(trace_name):
return super().execute_command(command, *args[1:], **kwargs)

# pylint: disable=arguments-differ
def pipeline(self, name: str) -> "MonitoredClusterRedisPipeline":
"""Create a pipeline.

This returns an object on which you can call the standard Redis
commands. Execution will be deferred until ``execute`` is called. This
is useful for saving round trips even in a clustered environment .

:param name: The name to attach to diagnostics for this pipeline.

"""
return MonitoredClusterRedisPipeline(
f"{self.context_name}.pipeline_{name}",
self.server_span,
self.connection_pool,
self.response_callbacks,
)

# No transaction support in redis-py-cluster
def transaction(self, *args: Any, **kwargs: Any) -> Any:
"""Not currently implemented."""
raise NotImplementedError


class MonitoredClusterRedisPipeline(Pipeline):
def __init__(
self,
trace_name: str,
server_span: Span,
connection_pool: rediscluster.ClusterConnectionPool,
response_callbacks: Dict,
**kwargs: Any,
):
self.trace_name = trace_name
self.server_span = server_span
super().__init__(connection_pool, response_callbacks, **kwargs)

# pylint: disable=arguments-differ
def execute(self, **kwargs: Any) -> Any: # type: ignore
with self.server_span.make_child(self.trace_name):
return super().execute(**kwargs)
29 changes: 29 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ services:
- "memcached"
- "redis"
- "zookeeper"
- "redis-cluster-node-0"
- "redis-cluster-node-1"
- "redis-cluster-node-2"

cassandra:
image: "cassandra:3.11"
Expand All @@ -25,3 +28,29 @@ services:
image: "redis:4.0.9"
zookeeper:
image: "zookeeper:3.4.10"
redis-cluster-node-0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, this is a lot of stuff. do we actually need a three node cluster to test this out? or can we mock it more simply?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea would be to use

grokzen/redis-cluster

which gives us a Redis cluster with a single Docker dependency. It is not recommended for production use but perfect for testing environments. I will say, Redis cluster has idiosyncrasies that are worth having tests run against a real instance but that depends entirely on the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not actually implementing a full redis cluster client here, thankfully, but just wrapping an existing one. do you think we need to have behavioral tests here in baseplate beyond ensuring that we generate spans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were a few cases where I was thinking more testing would be useful but ultimately any test I could think of would just end up testing the underlying redis-py-cluster library, so didn't add anything beyond what the redis client already has (I did end up using the redis-cluster container for the integration tests though, it was also pretty handy for manual testing)

image: docker.io/bitnami/redis-cluster:6.2-debian-10
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2'
redis-cluster-node-1:
image: docker.io/bitnami/redis-cluster:6.2-debian-10
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2'
redis-cluster-node-2:
image: docker.io/bitnami/redis-cluster:6.2-debian-10
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2'
redis-cluster-init:
image: docker.io/bitnami/redis-cluster:6.2-debian-10
depends_on:
- redis-cluster-node-0
- redis-cluster-node-1
- redis-cluster-node-2
environment:
- 'REDISCLI_AUTH='
- 'REDIS_CLUSTER_REPLICAS=1'
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2'
- 'REDIS_CLUSTER_CREATOR=yes'
1 change: 1 addition & 0 deletions requirements-transitive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pyramid==1.10.5
python-json-logger==2.0.1
reddit-cqlmapper==0.3.0
redis==3.5.3
redis-py-cluster==2.1.0
regex==2020.11.13
requests==2.25.1
sentry-sdk==0.20.1
Expand Down
62 changes: 61 additions & 1 deletion tests/integration/redis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from baseplate.clients.redis import RedisClient
from baseplate import Baseplate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gasp!

from . import TestBaseplateObserver, get_endpoint_or_skip_container

from baseplate.clients.redis import MessageQueue
Expand Down Expand Up @@ -79,6 +78,67 @@ def test_pipeline(self):
self.assertIsNone(span_observer.on_finish_exc_info)


class RedisClusterIntegrationTests(unittest.TestCase):
def setUp(self):
self.baseplate_observer = TestBaseplateObserver()

baseplate = Baseplate({"rediscluster.url": f"redis://{redis_endpoint}/0"})
baseplate.register(self.baseplate_observer)
baseplate.configure_context({"rediscluster": RedisClient()})

self.context = baseplate.make_context_object()
self.server_span = baseplate.make_server_span(self.context, "test")

def test_simple_command(self):
with self.server_span:
result = self.context.rediscluster.ping()

self.assertTrue(result)

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.PING")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)

def test_error(self):
with self.server_span:
with self.assertRaises(redis.ResponseError):
self.context.rediscluster.execute_command("crazycommand")

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNotNone(span_observer.on_finish_exc_info)

def test_lock(self):
with self.server_span:
with self.context.rediscluster.lock("foo"):
pass

server_span_observer = self.baseplate_observer.get_only_child()

self.assertGreater(len(server_span_observer.children), 0)
for span_observer in server_span_observer.children:
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)

def test_pipeline(self):
with self.server_span:
with self.context.rediscluster.pipeline("foo") as pipeline:
pipeline.ping()
pipeline.execute()

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)


class RedisMessageQueueTests(unittest.TestCase):
qname = "redisTestQueue"

Expand Down