Skip to content

Commit 3c0b1ec

Browse files
committed
Added getter and setter for the client's retry object and added more tests
1 parent 8c2f15c commit 3c0b1ec

File tree

8 files changed

+169
-26
lines changed

8 files changed

+169
-26
lines changed

redis/asyncio/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ def get_connection_kwargs(self):
273273
"""Get the connection's key-word arguments"""
274274
return self.connection_pool.connection_kwargs
275275

276+
def get_retry(self) -> Optional["Retry"]:
277+
return self.get_connection_kwargs().get("retry")
278+
279+
def set_retry(self, retry: "Retry") -> None:
280+
self.get_connection_kwargs().update({"retry": retry})
281+
276282
def load_external_module(self, funcname, func):
277283
"""
278284
This function can be used to add externally defined redis modules,

redis/asyncio/cluster.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,14 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
495495
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
496496
return self.connection_kwargs
497497

498+
def get_retry(self) -> Optional["Retry"]:
499+
return self.retry
500+
501+
def set_retry(self, retry: "Retry") -> None:
502+
self.retry = retry
503+
for node in self.get_nodes():
504+
node.connection_kwargs.update({"retry": retry})
505+
498506
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
499507
"""Set a custom response callback."""
500508
self.response_callbacks[command] = callback

redis/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
import warnings
77
from itertools import chain
8+
from typing import Optional
89

910
from redis.commands import (
1011
CoreCommands,
@@ -1043,6 +1044,12 @@ def get_connection_kwargs(self):
10431044
"""Get the connection's key-word arguments"""
10441045
return self.connection_pool.connection_kwargs
10451046

1047+
def get_retry(self) -> Optional["Retry"]:
1048+
return self.get_connection_kwargs().get("retry")
1049+
1050+
def set_retry(self, retry: "Retry") -> None:
1051+
self.get_connection_kwargs().update({"retry": retry})
1052+
10461053
def set_response_callback(self, command, callback):
10471054
"""Set a custom Response Callback"""
10481055
self.response_callbacks[command] = callback

redis/cluster.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,14 @@ def set_default_node(self, node):
687687
self.nodes_manager.default_node = node
688688
return True
689689

690+
def get_retry(self) -> Optional["Retry"]:
691+
return self.retry
692+
693+
def set_retry(self, retry: "Retry") -> None:
694+
self.retry = retry
695+
for node in self.get_nodes():
696+
node.redis_connection.set_retry(retry)
697+
690698
def monitor(self, target_node=None):
691699
"""
692700
Returns a Monitor object for the specified target node.

tests/test_asyncio/test_cluster.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,9 +249,32 @@ async def test_startup_nodes(self) -> None:
249249
]
250250
)
251251

252-
async def test_cluster_retry_object(self) -> None:
252+
async def test_cluster_get_set_retry_object(self, request):
253+
retry = Retry(NoBackoff(), 2)
254+
url = request.config.getoption("--redis-url")
255+
r = await RedisCluster.from_url(url, retry=retry)
256+
assert r.get_retry()._retries == retry._retries
257+
assert isinstance(r.get_retry()._backoff, NoBackoff)
258+
for node in r.get_nodes():
259+
n_retry = node.connection_kwargs.get("retry")
260+
assert n_retry is not None
261+
assert n_retry._retries == retry._retries
262+
assert isinstance(n_retry._backoff, NoBackoff)
263+
# Change retry policy
264+
new_retry = Retry(ExponentialBackoff(), 3)
265+
r.set_retry(new_retry)
266+
assert r.get_retry()._retries == new_retry._retries
267+
assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
268+
for node in r.get_nodes():
269+
n_retry = node.connection_kwargs.get("retry")
270+
assert n_retry is not None
271+
assert n_retry._retries == new_retry._retries
272+
assert isinstance(n_retry._backoff, ExponentialBackoff)
273+
274+
async def test_cluster_retry_object(self, request) -> None:
275+
url = request.config.getoption("--redis-url")
276+
rc_default = await RedisCluster.from_url(url)
253277
# Test default retry
254-
rc_default = await RedisCluster("127.0.0.1", 16379)
255278
retry = rc_default.connection_kwargs.get("retry")
256279
assert isinstance(retry, Retry)
257280
assert retry._retries == 3
@@ -262,23 +285,21 @@ async def test_cluster_retry_object(self) -> None:
262285

263286
# Test custom retry
264287
retry = Retry(ExponentialBackoff(10, 5), 5)
265-
rc_custom_retry = await RedisCluster("127.0.0.1", 16379, retry=retry)
288+
rc_custom_retry = await RedisCluster.from_url(url, retry=retry)
266289
assert (
267290
rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get("retry")
268291
== retry
269292
)
270293

271294
# Test no connection retries
272-
rc_no_retries = await RedisCluster(
273-
"127.0.0.1", 16379, connection_error_retry_attempts=0
295+
rc_no_retries = await RedisCluster.from_url(
296+
url, connection_error_retry_attempts=0
274297
)
275298
assert (
276299
rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get("retry")
277300
is None
278301
)
279-
rc_no_retries = await RedisCluster(
280-
"127.0.0.1", 16379, retry=Retry(NoBackoff(), 0)
281-
)
302+
rc_no_retries = await RedisCluster.from_url(url, retry=Retry(NoBackoff(), 0))
282303
assert (
283304
rc_no_retries.get_node("127.0.0.1", 16379)
284305
.connection_kwargs.get("retry")

tests/test_asyncio/test_retry.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import pytest
22

3+
from redis.asyncio import Redis
34
from redis.asyncio.connection import Connection, UnixDomainSocketConnection
45
from redis.asyncio.retry import Retry
5-
from redis.backoff import AbstractBackoff, NoBackoff
6+
from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff
67
from redis.exceptions import ConnectionError, TimeoutError
78

89

@@ -114,3 +115,18 @@ async def test_infinite_retry(self):
114115

115116
assert self.actual_attempts == 5
116117
assert self.actual_failures == 5
118+
119+
120+
class TestRedisClientRetry:
121+
"Test the Redis client behavior with retries"
122+
123+
async def test_get_set_retry_object(self, request):
124+
retry = Retry(NoBackoff(), 2)
125+
url = request.config.getoption("--redis-url")
126+
r = await Redis.from_url(url, retry_on_timeout=True, retry=retry)
127+
assert r.get_retry()._retries == retry._retries
128+
assert isinstance(r.get_retry()._backoff, NoBackoff)
129+
new_retry_policy = Retry(ExponentialBackoff(), 3)
130+
r.set_retry(new_retry_policy)
131+
assert r.get_retry()._retries == new_retry_policy._retries
132+
assert isinstance(r.get_retry()._backoff, ExponentialBackoff)

tests/test_cluster.py

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,60 @@ def ok_response(connection, *args, **options):
360360

361361
assert r.execute_command("SET", "foo", "bar") == "MOCK_OK"
362362

363+
def test_handling_cluster_failover_to_a_replica(self, r):
364+
# Set the key we'll test for
365+
key = "key"
366+
r.set("key", "value")
367+
primary = r.get_node_from_key(key, replica=False)
368+
assert r.get("key") == "value"
369+
# Get the current output of cluster slots
370+
cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS")
371+
replica_host = ""
372+
replica_port = 0
373+
# Replace one of the replicas to be the new primary based on the
374+
# cluster slots output
375+
for slot_range in cluster_slots:
376+
primary_port = slot_range[2][1]
377+
if primary_port == primary.port:
378+
if len(slot_range) <= 3:
379+
# cluster doesn't have a replica, return
380+
return
381+
replica_host = str_if_bytes(slot_range[3][0])
382+
replica_port = slot_range[3][1]
383+
# replace replica and primary in the cluster slots output
384+
tmp_node = slot_range[2]
385+
slot_range[2] = slot_range[3]
386+
slot_range[3] = tmp_node
387+
break
388+
389+
def raise_connection_error():
390+
raise ConnectionError("error")
391+
392+
def mock_execute_command(*_args, **_kwargs):
393+
if _args[0] == "CLUSTER SLOTS":
394+
return cluster_slots
395+
else:
396+
raise Exception("Failed to mock cluster slots")
397+
398+
# Mock connection error for the current primary
399+
mock_node_resp_func(primary, raise_connection_error)
400+
primary.redis_connection.set_retry(Retry(NoBackoff(), 1))
401+
402+
# Mock the cluster slots response for all other nodes
403+
redis_mock_node = Mock()
404+
redis_mock_node.execute_command.side_effect = mock_execute_command
405+
# Mock response value for all other commands
406+
redis_mock_node.parse_response.return_value = "MOCK_OK"
407+
for node in r.get_nodes():
408+
if node.port != primary.port:
409+
node.redis_connection = redis_mock_node
410+
411+
assert r.get(key) == "MOCK_OK"
412+
new_primary = r.get_node_from_key(key, replica=False)
413+
assert new_primary.host == replica_host
414+
assert new_primary.port == replica_port
415+
assert r.get_node(primary.host, primary.port).server_type == REPLICA
416+
363417
def test_moved_redirection(self, request):
364418
"""
365419
Test that the client handles MOVED response.
@@ -693,44 +747,57 @@ def moved_redirect_effect(connection, *args, **options):
693747
cur_node = r.get_node(node_name=node_name)
694748
assert conn == r.get_redis_connection(cur_node)
695749

696-
def test_cluster_retry_object(self) -> None:
750+
def test_cluster_get_set_retry_object(self, request):
751+
retry = Retry(NoBackoff(), 2)
752+
r = _get_client(Redis, request, retry=retry)
753+
assert r.get_retry()._retries == retry._retries
754+
assert isinstance(r.get_retry()._backoff, NoBackoff)
755+
for node in r.get_nodes():
756+
assert node.redis_connection.get_retry()._retries == retry._retries
757+
assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff)
758+
# Change retry policy
759+
new_retry = Retry(ExponentialBackoff(), 3)
760+
r.set_retry(new_retry)
761+
assert r.get_retry()._retries == new_retry._retries
762+
assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
763+
for node in r.get_nodes():
764+
assert node.redis_connection.get_retry()._retries == new_retry._retries
765+
assert isinstance(
766+
node.redis_connection.get_retry()._backoff, ExponentialBackoff
767+
)
768+
769+
def test_cluster_retry_object(self, r) -> None:
697770
# Test default retry
698-
rc_default = RedisCluster("127.0.0.1", 16379)
699-
retry = rc_default.get_connection_kwargs().get("retry")
771+
retry = r.get_connection_kwargs().get("retry")
700772
assert isinstance(retry, Retry)
701773
assert retry._retries == 3
702774
assert isinstance(retry._backoff, type(get_default_backoff()))
703-
assert rc_default.get_node("127.0.0.1", 16379).get_connection_kwargs().get(
704-
"retry"
705-
) == rc_default.get_node("127.0.0.1", 16380).get_connection_kwargs().get(
706-
"retry"
707-
)
775+
node1 = r.get_node("127.0.0.1", 16379).redis_connection
776+
node2 = r.get_node("127.0.0.1", 16380).redis_connection
777+
assert node1.get_retry()._retries == node2.get_retry()._retries
708778

709779
# Test custom retry
710780
retry = Retry(ExponentialBackoff(10, 5), 5)
711781
rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry)
712782
assert (
713783
rc_custom_retry.get_node("127.0.0.1", 16379)
714-
.get_connection_kwargs()
715-
.get("retry")
716-
== retry
784+
.redis_connection.get_retry()
785+
._retries
786+
== retry._retries
717787
)
718788

719789
# Test no connection retries
720790
rc_no_retries = RedisCluster(
721791
"127.0.0.1", 16379, connection_error_retry_attempts=0
722792
)
723793
assert (
724-
rc_no_retries.get_node("127.0.0.1", 16379)
725-
.get_connection_kwargs()
726-
.get("retry")
794+
rc_no_retries.get_node("127.0.0.1", 16379).redis_connection.get_retry()
727795
is None
728796
)
729797
rc_no_retries = RedisCluster("127.0.0.1", 16379, retry=Retry(NoBackoff(), 0))
730798
assert (
731799
rc_no_retries.get_node("127.0.0.1", 16379)
732-
.get_connection_kwargs()
733-
.get("retry")
800+
.redis_connection.get_retry()
734801
._retries
735802
== 0
736803
)

tests/test_retry.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from redis.backoff import NoBackoff
5+
from redis.backoff import ExponentialBackoff, NoBackoff
66
from redis.client import Redis
77
from redis.connection import Connection, UnixDomainSocketConnection
88
from redis.exceptions import (
@@ -203,3 +203,13 @@ def test_client_retry_on_timeout(self, request):
203203
r.get("foo")
204204
finally:
205205
assert parse_response.call_count == retries + 1
206+
207+
def test_get_set_retry_object(self, request):
208+
retry = Retry(NoBackoff(), 2)
209+
r = _get_client(Redis, request, retry_on_timeout=True, retry=retry)
210+
assert r.get_retry()._retries == retry._retries
211+
assert isinstance(r.get_retry()._backoff, NoBackoff)
212+
new_retry_policy = Retry(ExponentialBackoff(), 3)
213+
r.set_retry(new_retry_policy)
214+
assert r.get_retry()._retries == new_retry_policy._retries
215+
assert isinstance(r.get_retry()._backoff, ExponentialBackoff)

0 commit comments

Comments
 (0)