Skip to content

Commit 9d93769

Browse files
committed
[GROW-2938] add retry to ClusterPipeline
1 parent 61f5390 commit 9d93769

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

redis/cluster.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,8 @@ def __init__(
591591
self.retry = retry
592592
kwargs.update({"retry": self.retry})
593593
else:
594-
kwargs.update({"retry": Retry(default_backoff(), 0)})
594+
self.retry = Retry(default_backoff(), 0)
595+
kwargs["retry"] = self.retry
595596

596597
self.encoder = Encoder(
597598
kwargs.get("encoding", "utf-8"),
@@ -775,6 +776,7 @@ def pipeline(self, transaction=None, shard_hint=None):
775776
read_from_replicas=self.read_from_replicas,
776777
reinitialize_steps=self.reinitialize_steps,
777778
lock=self._lock,
779+
retry=self.retry,
778780
)
779781

780782
def lock(
@@ -1796,6 +1798,7 @@ def __init__(
17961798
cluster_error_retry_attempts: int = 3,
17971799
reinitialize_steps: int = 5,
17981800
lock=None,
1801+
retry: Optional["Retry"] = None,
17991802
**kwargs,
18001803
):
18011804
""" """
@@ -1821,6 +1824,7 @@ def __init__(
18211824
if lock is None:
18221825
lock = threading.Lock()
18231826
self._lock = lock
1827+
self.retry = retry
18241828

18251829
def __repr__(self):
18261830
""" """
@@ -1953,8 +1957,9 @@ def send_cluster_commands(
19531957
stack,
19541958
raise_on_error=raise_on_error,
19551959
allow_redirections=allow_redirections,
1960+
attempts_count=self.cluster_error_retry_attempts - retry_attempts,
19561961
)
1957-
except (ClusterDownError, ConnectionError) as e:
1962+
except (ClusterDownError, ConnectionError, TimeoutError) as e:
19581963
if retry_attempts > 0:
19591964
# Try again with the new cluster setup. All other errors
19601965
# should be raised.
@@ -1964,7 +1969,7 @@ def send_cluster_commands(
19641969
raise e
19651970

19661971
def _send_cluster_commands(
1967-
self, stack, raise_on_error=True, allow_redirections=True
1972+
self, stack, raise_on_error=True, allow_redirections=True, attempts_count=0
19681973
):
19691974
"""
19701975
Send a bunch of cluster commands to the redis cluster.
@@ -2019,9 +2024,11 @@ def _send_cluster_commands(
20192024
redis_node = self.get_redis_connection(node)
20202025
try:
20212026
connection = get_connection(redis_node, c.args)
2022-
except ConnectionError:
2023-
# Connection retries are being handled in the node's
2024-
# Retry object. Reinitialize the node -> slot table.
2027+
except (ConnectionError, TimeoutError) as e:
2028+
if self.retry and isinstance(e, self.retry._supported_errors):
2029+
backoff = self.retry._backoff.compute(attempts_count)
2030+
if backoff > 0:
2031+
time.sleep(backoff)
20252032
self.nodes_manager.initialize()
20262033
if is_default_node:
20272034
self.replace_default_node()

0 commit comments

Comments
 (0)