Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rediscluster support #573

Merged
merged 28 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7492d69
Add redisc cluster client support
Mar 8, 2021
e9b646a
Add integration tests for ClusterRedisClient
Mar 8, 2021
6050bc3
remove unused import
Mar 8, 2021
8d5d40b
remove unnneeded comment
Mar 8, 2021
cb95feb
remove unused import
Mar 8, 2021
9655d17
disable pylint warning
Mar 8, 2021
6214d10
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
78dc569
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
1765def
address comments
Mar 10, 2021
ffdecf9
set version
Mar 10, 2021
9570abf
Update baseplate/clients/redis_cluster.py
FranGM Mar 10, 2021
df35f03
address comments (take 2)
Mar 11, 2021
94914ae
When read_from_replicas is enabled also read from master
Mar 11, 2021
191ed5d
Return only one node on get_node_by_slot
FranGM Mar 30, 2021
c78dcf2
fix documentation lint issues
FranGM Mar 30, 2021
8db95aa
change some client defaults
FranGM Mar 30, 2021
024adff
add more tests
FranGM Mar 30, 2021
b904a82
Remove unused metric
Apr 6, 2021
aeb416d
pass read_from_replicas arg when creating pipeline
FranGM Apr 20, 2021
2c099df
bump redis-py-cluster version
FranGM Apr 20, 2021
1ee4bdc
Add hot key tracker to the cluster reddis client
Apr 29, 2021
35839ad
Merge branch 'rediscluster-support' of github.com:FranGM/baseplate.py…
Apr 29, 2021
4255cf2
Allow to configure max_connections_per_node
FranGM May 2, 2021
972f82d
Update to current redis-py-cluster version
FranGM May 18, 2021
96054f4
Update requirements-transitive.txt
FranGM May 18, 2021
c35f976
Add hot key tracking to docs
May 19, 2021
44ecda5
Update setup.py
FranGM May 19, 2021
99b4ffd
Update docs/api/baseplate/clients/redis_cluster.rst
spladug May 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
450 changes: 450 additions & 0 deletions baseplate/clients/redis_cluster.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- "memcached"
- "redis"
- "zookeeper"

- "redis-cluster-node"
cassandra:
image: "cassandra:3.11"
environment:
Expand All @@ -25,3 +25,5 @@ services:
image: "redis:4.0.9"
zookeeper:
image: "zookeeper:3.4.10"
redis-cluster-node:
image: docker.io/grokzen/redis-cluster:6.2.0
1 change: 1 addition & 0 deletions docs/api/baseplate/clients/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Instrumented Client Libraries
baseplate.clients.kombu: Client for publishing to queues <kombu>
baseplate.clients.memcache: Memcached Client <memcache>
baseplate.clients.redis: Redis Client <redis>
baseplate.clients.redis_cluster: Redis Cluster Client <redis_cluster>
baseplate.clients.requests: Requests (HTTP) Client <requests>
baseplate.clients.sqlalchemy: SQL Client for relational databases (e.g. PostgreSQL) <sqlalchemy>
baseplate.clients.thrift: Thrift client for RPC to other backend services <thrift>
Expand Down
145 changes: 145 additions & 0 deletions docs/api/baseplate/clients/redis_cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
``baseplate.clients.redis_cluster``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the hot key tracking stuff be mentioned somewhere in this page?

===================================

`Redis`_ is an in-memory data structure store used where speed is necessary but
complexity is beyond simple key-value operations. (If you're just doing
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python
client library that supports interacting with Redis when operating in cluster mode.

.. _`Redis`: https://redis.io/
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py

.. automodule:: baseplate.clients.redis_cluster

.. versionadded:: 2.1

Example
-------

To integrate redis-py-cluster with your application, add the appropriate client
declaration to your context configuration::

baseplate.configure_context(
app_config,
{
...
"foo": ClusterRedisClient(),
...
}
)

configure it in your application's configuration file:

.. code-block:: ini

[app:main]

...


# required: what redis instance to connect to
foo.url = redis://localhost:6379/0

# optional: the maximum size of the connection pool
foo.max_connections = 99

# optional: how long to wait for a connection to establish
foo.timeout = 3 seconds

# optional: Whether read requests should be directed to replicas as well
# instead of just the primary
foo.read_from_replicas = true
...


and then use the attached :py:class:`~redis.Redis`-like object in
request::

def my_method(request):
request.foo.ping()

Configuration
-------------

.. autoclass:: ClusterRedisClient

.. autofunction:: cluster_pool_from_config

Classes
-------

.. autoclass:: ClusterRedisContextFactory
:members:

.. autoclass:: MonitoredClusterRedisConnection
:members:

Runtime Metrics
---------------

In addition to request-level metrics reported through spans, this wrapper
reports connection pool statistics periodically via the :ref:`runtime-metrics`
system. All metrics are tagged with ``client``, the name given to
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context
factory.

The following metrics are reported:

``runtime.pool.size``
The size limit for the connection pool.
``runtime.pool.in_use``
How many connections have been established and are currently checked out and
being used.


Hot Key Tracking
----------------

Optionally, the client can help track key usage across the Redis cluster to
help you identify if you have "hot" keys (keys that are read from or
written to much more frequently than other keys). This is particularly useful
in clusters with the `noeviction` set as the eviction policy, since Redis
spladug marked this conversation as resolved.
Show resolved Hide resolved
lacks a built-in mechanism to help you track hot keys in this case.

Since tracking every single key used is expensive, the tracker works by
tracking a small percentage or reads and/or writes, which can be configured
on your client:

.. code-block:: ini

[app:main]

...
# Note that by default the sample rate will be zero for both reads and writes

# optional: Sample keys for 1% of read operations
foo.track_key_reads_sample_rate = 0.01

# optional: Sample keys for 10% of write operations
foo.track_key_writes_sample_rate = 0.01

...

The keys tracked will be written to a sorted set in the Redis cluster itself,
which we can query at any time to see what keys are read from or written to
more often than others. Keys used for writes will be stored in
`baseplate-hot-key-tracker-writes` and keys used for reads will be stored in
`baseplate-hot-key-tracker-reads`. Here's an example of how you can query the
top 10 keys on each categories with their associated scores:

.. code-block:: console

> ZREVRANGEBYSCORE baseplate-hot-key-tracker-reads +inf 0 WITHSCORES LIMIT 0 10

> ZREVRANGEBYSCORE baseplate-hot-key-tracker-writes +inf 0 WITHSCORES LIMIT 0 10


Note that due to how the sampling works the scores are only meaningful in a
relative sense (by comparing one key's access frequency to others in the list)
but can't be used to make any inferences about key access rate or anything like
that.

Both tracker sets have a default TTL of 24 hours, so once they stop being
written to (for instance, if key tracking is disabled) they will clean up
after themselves in 24 hours, allowing us to start fresh the next time we
want to enable key tracking.
1 change: 1 addition & 0 deletions requirements-transitive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pyramid==1.10.5
python-json-logger==2.0.1
reddit-cqlmapper==0.3.0
redis==3.5.3
redis-py-cluster==2.1.2
regex==2020.11.13
requests==2.25.1
sentry-sdk==0.20.1
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sphinx-autodoc-typehints==1.11.1
sphinxcontrib-spelling==7.1.0
webtest==2.0.35
wheel==0.36.2
fakeredis==1.5.0
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ ignore_missing_imports = True
[mypy-pythonjsonlogger.*]
ignore_missing_imports = True

[mypy-rediscluster.*]
ignore_missing_imports = True

[mypy-sqlalchemy.*]
ignore_missing_imports = True

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"memcache": ["pymemcache>=1.3.0,<1.4.4"],
"pyramid": ["pyramid>=1.9.0,<2.0"],
"redis": ["redis>=2.10.0,<4.0.0"],
"redis-py-cluster": ["redis-py-cluster>=2.0.0,<3.0.0"],
"refcycle": ["objgraph>=3.0,<4.0"],
"requests": ["advocate>=1.0.0,<2.0"],
"sentry": ["sentry-sdk>=0.19,<1.0"],
Expand Down
158 changes: 158 additions & 0 deletions tests/integration/redis_cluster_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import unittest

try:
import rediscluster
except ImportError:
raise unittest.SkipTest("redis-py-cluster is not installed")

from baseplate.lib.config import ConfigurationError
from baseplate.clients.redis_cluster import cluster_pool_from_config

from baseplate.clients.redis_cluster import ClusterRedisClient
from baseplate import Baseplate
from . import TestBaseplateObserver, get_endpoint_or_skip_container

redis_endpoint = get_endpoint_or_skip_container("redis-cluster-node", 7000)


# This belongs on the unit tests section but the client class attempts to initialise
# the list of nodes when being instantiated so it's simpler to test here with a redis
# cluster available
class ClusterPoolFromConfigTests(unittest.TestCase):
def test_empty_config(self):
with self.assertRaises(ConfigurationError):
cluster_pool_from_config({})

def test_basic_url(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_timeouts(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "30 seconds",
}
)

self.assertEqual(pool.timeout, 30)

def test_max_connections(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.max_connections": "300",
}
)

self.assertEqual(pool.max_connections, 300)

def test_max_connections_default(self):
# https://github.com/Grokzen/redis-py-cluster/issues/435
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.max_connections, 50)

def test_kwargs_passthrough(self):
pool = cluster_pool_from_config(
{"rediscluster.url": f"redis://{redis_endpoint}/0"}, example="present"
)

self.assertEqual(pool.connection_kwargs["example"], "present")

def test_alternate_prefix(self):
pool = cluster_pool_from_config(
{"noodle.url": f"redis://{redis_endpoint}/0"}, prefix="noodle."
)
self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_only_primary_available(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
node_list = [pool.get_node_by_slot(slot=1, read_command=False) for _ in range(0, 100)]

# The primary is on port 7000 so that's the only port we expect to see
self.assertTrue(all(node["port"] == 7000 for node in node_list))

def test_read_from_replicas(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

node_list = [pool.get_node_by_slot(slot=1, read_command=True) for _ in range(0, 100)]

# Both replicas and primary are available, so we expect to see some non-primaries here
self.assertTrue(any(node["port"] != 7000 for node in node_list))


class RedisClusterIntegrationTests(unittest.TestCase):
def setUp(self):
self.baseplate_observer = TestBaseplateObserver()

baseplate = Baseplate(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "1 second",
"rediscluster.max_connections": "4",
}
)
baseplate.register(self.baseplate_observer)
baseplate.configure_context({"rediscluster": ClusterRedisClient()})

self.context = baseplate.make_context_object()
self.server_span = baseplate.make_server_span(self.context, "test")

def test_simple_command(self):
with self.server_span:
result = self.context.rediscluster.ping()

self.assertTrue(result)

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.PING")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)

def test_error(self):
with self.server_span:
with self.assertRaises(rediscluster.RedisClusterException):
self.context.rediscluster.execute_command("crazycommand")

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNotNone(span_observer.on_finish_exc_info)

def test_lock(self):
with self.server_span:
with self.context.rediscluster.lock("foo-lock"):
pass

server_span_observer = self.baseplate_observer.get_only_child()

self.assertGreater(len(server_span_observer.children), 0)
for span_observer in server_span_observer.children:
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)

def test_pipeline(self):
with self.server_span:
with self.context.rediscluster.pipeline("foo") as pipeline:
pipeline.set("foo", "bar")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.delete("foo")
pipeline.execute()

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)
Loading