Skip to content

Commit 34e7d89

Browse files
committed
[GROW-2938] add retry to ClusterPipeline
1 parent 852839f commit 34e7d89

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

redis/cluster.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,8 @@ def __init__(
576576
self.retry = retry
577577
kwargs.update({"retry": self.retry})
578578
else:
579-
kwargs.update({"retry": Retry(default_backoff(), 0)})
579+
self.retry = Retry(default_backoff(), 0)
580+
kwargs['retry'] = self.retry
580581

581582
self.encoder = Encoder(
582583
kwargs.get("encoding", "utf-8"),
@@ -759,6 +760,7 @@ def pipeline(self, transaction=None, shard_hint=None):
759760
read_from_replicas=self.read_from_replicas,
760761
reinitialize_steps=self.reinitialize_steps,
761762
lock=self._lock,
763+
retry=self.retry,
762764
)
763765

764766
def lock(
@@ -1764,6 +1766,7 @@ def __init__(
17641766
cluster_error_retry_attempts: int = 3,
17651767
reinitialize_steps: int = 5,
17661768
lock=None,
1769+
retry: Optional["Retry"] = None,
17671770
**kwargs,
17681771
):
17691772
""" """
@@ -1789,6 +1792,7 @@ def __init__(
17891792
if lock is None:
17901793
lock = threading.Lock()
17911794
self._lock = lock
1795+
self.retry = retry
17921796

17931797
def __repr__(self):
17941798
""" """
@@ -1921,8 +1925,9 @@ def send_cluster_commands(
19211925
stack,
19221926
raise_on_error=raise_on_error,
19231927
allow_redirections=allow_redirections,
1928+
attempts_count=self.cluster_error_retry_attempts - retry_attempts
19241929
)
1925-
except (ClusterDownError, ConnectionError) as e:
1930+
except (ClusterDownError, ConnectionError, TimeoutError) as e:
19261931
if retry_attempts > 0:
19271932
# Try again with the new cluster setup. All other errors
19281933
# should be raised.
@@ -1932,7 +1937,7 @@ def send_cluster_commands(
19321937
raise e
19331938

19341939
def _send_cluster_commands(
1935-
self, stack, raise_on_error=True, allow_redirections=True
1940+
self, stack, raise_on_error=True, allow_redirections=True, attempts_count=0
19361941
):
19371942
"""
19381943
Send a bunch of cluster commands to the redis cluster.
@@ -1987,9 +1992,11 @@ def _send_cluster_commands(
19871992
redis_node = self.get_redis_connection(node)
19881993
try:
19891994
connection = get_connection(redis_node, c.args)
1990-
except ConnectionError:
1991-
# Connection retries are being handled in the node's
1992-
# Retry object. Reinitialize the node -> slot table.
1995+
except (ConnectionError, TimeoutError) as e:
1996+
if self.retry and isinstance(e, self.retry._supported_errors):
1997+
backoff = self.retry._backoff.compute(attempts_count)
1998+
if backoff > 0:
1999+
time.sleep(backoff)
19932000
self.nodes_manager.initialize()
19942001
if is_default_node:
19952002
self.replace_default_node()

0 commit comments

Comments
 (0)