Skip to content

Commit 68ef50a

Browse files
committed
[GROW-3361] always release, instead of disconnect, when error occurs during get_connection (#11)
* add is_supported_error() to retry * release, instead of disconnect on any error, when fetching connections in cluster pipeline * add a default backoff after cluster pipeline disconnects its connections (cherry picked from commit 179891d)
1 parent 1f75b91 commit 68ef50a

File tree

4 files changed

+69
-12
lines changed

4 files changed

+69
-12
lines changed

redis/cluster.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ def execute_command(self, *args, **kwargs):
10911091
# The nodes and slots cache were reinitialized.
10921092
# Try again with the new cluster setup.
10931093
retry_attempts -= 1
1094-
if self.retry and isinstance(e, self.retry._supported_errors):
1094+
if self.retry and self.retry.is_supported_error(e):
10951095
backoff = self.retry._backoff.compute(
10961096
self.cluster_error_retry_attempts - retry_attempts
10971097
)
@@ -2100,20 +2100,19 @@ def _send_cluster_commands(
21002100
redis_node = self.get_redis_connection(node)
21012101
try:
21022102
connection = get_connection(redis_node, c.args)
2103-
except (ConnectionError, TimeoutError) as e:
2103+
except BaseException as e:
21042104
for n in nodes.values():
21052105
n.connection_pool.release(n.connection)
21062106
n.connection = None
21072107
nodes = {}
2108-
if self.retry and isinstance(
2109-
e, self.retry._supported_errors
2110-
):
2108+
if self.retry and self.retry.is_supported_error(e):
21112109
backoff = self.retry._backoff.compute(attempts_count)
21122110
if backoff > 0:
21132111
time.sleep(backoff)
2114-
self.nodes_manager.initialize()
2115-
if is_default_node:
2116-
self.replace_default_node()
2112+
if isinstance(e, (ConnectionError, TimeoutError)):
2113+
self.nodes_manager.initialize()
2114+
if is_default_node:
2115+
self.replace_default_node()
21172116
raise
21182117
nodes[node_name] = NodeCommands(
21192118
redis_node.parse_response,
@@ -2229,6 +2228,8 @@ def _send_cluster_commands(
22292228
if n.connection:
22302229
n.connection.disconnect()
22312230
n.connection_pool.release(n.connection)
2231+
if len(nodes) > 0:
2232+
time.sleep(0.25)
22322233
raise
22332234

22342235
def _fail_on_redirect(self, allow_redirections):

redis/retry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list):
3232
set(self._supported_errors + tuple(specified_errors))
3333
)
3434

35+
def is_supported_error(self, error):
36+
return isinstance(error, self._supported_errors)
37+
3538
def call_with_retry(self, do, fail):
3639
"""
3740
Execute an operation that might fail and returns its result, or

tests/test_cluster.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,8 +2681,10 @@ def raise_error():
26812681

26822682
m.side_effect = raise_error
26832683

2684-
with pytest.raises(Exception, match="unexpected error"):
2685-
r.pipeline().get("a").execute()
2684+
with patch.object(Connection, "disconnect") as d:
2685+
with pytest.raises(Exception, match="unexpected error"):
2686+
r.pipeline().get("a").execute()
2687+
assert d.call_count == 1
26862688

26872689
for cluster_node in r.nodes_manager.nodes_cache.values():
26882690
connection_pool = cluster_node.redis_connection.connection_pool
@@ -2984,7 +2986,7 @@ def raise_ask_error():
29842986
assert res == ["MOCK_OK"]
29852987

29862988
@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
2987-
def test_return_previous_acquired_connections(self, r, error):
2989+
def test_return_previous_acquired_connections_with_retry(self, r, error):
29882990
# in order to ensure that a pipeline will make use of connections
29892991
# from different nodes
29902992
assert r.keyslot("a") != r.keyslot("b")
@@ -3000,7 +3002,13 @@ def raise_error(target_node, *args, **kwargs):
30003002

30013003
get_connection.side_effect = raise_error
30023004

3003-
r.pipeline().get("a").get("b").execute()
3005+
with patch.object(NodesManager, "initialize") as i:
3006+
# in order to remove disconnect caused by initialize
3007+
i.side_effect = lambda: None
3008+
3009+
with patch.object(Connection, "disconnect") as d:
3010+
r.pipeline().get("a").get("b").execute()
3011+
assert d.call_count == 0
30043012

30053013
# there should have been two get_connections per execution and
30063014
# two executions due to exception raised in the first execution
@@ -3010,6 +3018,39 @@ def raise_error(target_node, *args, **kwargs):
30103018
num_of_conns = len(connection_pool._available_connections)
30113019
assert num_of_conns == connection_pool._created_connections
30123020

3021+
@pytest.mark.parametrize("error", [RedisClusterException, BaseException])
3022+
def test_return_previous_acquired_connections_without_retry(self, r, error):
3023+
# in order to ensure that a pipeline will make use of connections
3024+
# from different nodes
3025+
assert r.keyslot("a") != r.keyslot("b")
3026+
3027+
orig_func = redis.cluster.get_connection
3028+
with patch("redis.cluster.get_connection") as get_connection:
3029+
3030+
def raise_error(target_node, *args, **kwargs):
3031+
if get_connection.call_count == 2:
3032+
raise error("mocked error")
3033+
else:
3034+
return orig_func(target_node, *args, **kwargs)
3035+
3036+
get_connection.side_effect = raise_error
3037+
3038+
with patch.object(Connection, "disconnect") as d:
3039+
with pytest.raises(error):
3040+
r.pipeline().get("a").get("b").execute()
3041+
assert d.call_count == 0
3042+
3043+
# there should have been two get_connections per execution and
3044+
# two executions due to exception raised in the first execution
3045+
assert get_connection.call_count == 2
3046+
for cluster_node in r.nodes_manager.nodes_cache.values():
3047+
connection_pool = cluster_node.redis_connection.connection_pool
3048+
num_of_conns = len(connection_pool._available_connections)
3049+
assert num_of_conns == connection_pool._created_connections
3050+
# connection must remain connected
3051+
for conn in connection_pool._available_connections:
3052+
assert conn._sock is not None
3053+
30133054
def test_empty_stack(self, r):
30143055
"""
30153056
If pipeline is executed with no commands it should

tests/test_retry.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
BusyLoadingError,
1010
ConnectionError,
1111
ReadOnlyError,
12+
RedisClusterException,
13+
RedisError,
1214
TimeoutError,
1315
)
1416
from redis.retry import Retry
@@ -122,6 +124,16 @@ def test_infinite_retry(self):
122124
assert self.actual_attempts == 5
123125
assert self.actual_failures == 5
124126

127+
@pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError])
128+
def test_is_supported_error_true(self, exception_class):
129+
retry = Retry(BackoffMock(), -1)
130+
assert retry.is_supported_error(exception_class())
131+
132+
@pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError])
133+
def test_is_supported_error_false(self, exception_class):
134+
retry = Retry(BackoffMock(), -1)
135+
assert not retry.is_supported_error(exception_class())
136+
125137

126138
@pytest.mark.onlynoncluster
127139
class TestRedisClientRetry:

0 commit comments

Comments
 (0)