@@ -576,7 +576,8 @@ def __init__(
576
576
self .retry = retry
577
577
kwargs .update ({"retry" : self .retry })
578
578
else :
579
- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
579
+ self .retry = Retry (default_backoff (), 0 )
580
+ kwargs ["retry" ] = self .retry
580
581
581
582
self .encoder = Encoder (
582
583
kwargs .get ("encoding" , "utf-8" ),
@@ -759,6 +760,7 @@ def pipeline(self, transaction=None, shard_hint=None):
759
760
read_from_replicas = self .read_from_replicas ,
760
761
reinitialize_steps = self .reinitialize_steps ,
761
762
lock = self ._lock ,
763
+ retry = self .retry ,
762
764
)
763
765
764
766
def lock (
@@ -842,41 +844,49 @@ def set_response_callback(self, command, callback):
842
844
def _determine_nodes (self , * args , ** kwargs ) -> List ["ClusterNode" ]:
843
845
# Determine which nodes should be executed the command on.
844
846
# Returns a list of target nodes.
845
- command = args [0 ].upper ()
846
- if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
847
- command = f"{ args [0 ]} { args [1 ]} " .upper ()
848
-
849
- nodes_flag = kwargs .pop ("nodes_flag" , None )
850
- if nodes_flag is not None :
851
- # nodes flag passed by the user
852
- command_flag = nodes_flag
853
- else :
854
- # get the nodes group for this command if it was predefined
855
- command_flag = self .command_flags .get (command )
856
- if command_flag == self .__class__ .RANDOM :
857
- # return a random node
858
- return [self .get_random_node ()]
859
- elif command_flag == self .__class__ .PRIMARIES :
860
- # return all primaries
861
- return self .get_primaries ()
862
- elif command_flag == self .__class__ .REPLICAS :
863
- # return all replicas
864
- return self .get_replicas ()
865
- elif command_flag == self .__class__ .ALL_NODES :
866
- # return all nodes
867
- return self .get_nodes ()
868
- elif command_flag == self .__class__ .DEFAULT_NODE :
869
- # return the cluster's default node
870
- return [self .nodes_manager .default_node ]
871
- elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
872
- return [self .nodes_manager .default_node ]
873
- else :
874
- # get the node that holds the key's slot
875
- slot = self .determine_slot (* args )
876
- node = self .nodes_manager .get_node_from_slot (
877
- slot , self .read_from_replicas and command in READ_COMMANDS
878
- )
879
- return [node ]
847
+ try :
848
+ command = args [0 ].upper ()
849
+ if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
850
+ command = f"{ args [0 ]} { args [1 ]} " .upper ()
851
+
852
+ nodes_flag = kwargs .pop ("nodes_flag" , None )
853
+ if nodes_flag is not None :
854
+ # nodes flag passed by the user
855
+ command_flag = nodes_flag
856
+ else :
857
+ # get the nodes group for this command if it was predefined
858
+ command_flag = self .command_flags .get (command )
859
+ if command_flag == self .__class__ .RANDOM :
860
+ # return a random node
861
+ return [self .get_random_node ()]
862
+ elif command_flag == self .__class__ .PRIMARIES :
863
+ # return all primaries
864
+ return self .get_primaries ()
865
+ elif command_flag == self .__class__ .REPLICAS :
866
+ # return all replicas
867
+ return self .get_replicas ()
868
+ elif command_flag == self .__class__ .ALL_NODES :
869
+ # return all nodes
870
+ return self .get_nodes ()
871
+ elif command_flag == self .__class__ .DEFAULT_NODE :
872
+ # return the cluster's default node
873
+ return [self .nodes_manager .default_node ]
874
+ elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
875
+ return [self .nodes_manager .default_node ]
876
+ else :
877
+ # get the node that holds the key's slot
878
+ slot = self .determine_slot (* args )
879
+ node = self .nodes_manager .get_node_from_slot (
880
+ slot , self .read_from_replicas and command in READ_COMMANDS
881
+ )
882
+ return [node ]
883
+ except SlotNotCoveredError as e :
884
+ self .reinitialize_counter += 1
885
+ if self ._should_reinitialized ():
886
+ self .nodes_manager .initialize ()
887
+ # Reset the counter
888
+ self .reinitialize_counter = 0
889
+ raise e
880
890
881
891
def _should_reinitialized (self ):
882
892
# To reinitialize the cluster on every MOVED error,
@@ -1068,6 +1078,12 @@ def execute_command(self, *args, **kwargs):
1068
1078
# The nodes and slots cache were reinitialized.
1069
1079
# Try again with the new cluster setup.
1070
1080
retry_attempts -= 1
1081
+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1082
+ backoff = self .retry ._backoff .compute (
1083
+ self .cluster_error_retry_attempts - retry_attempts
1084
+ )
1085
+ if backoff > 0 :
1086
+ time .sleep (backoff )
1071
1087
continue
1072
1088
else :
1073
1089
# raise the exception
@@ -1127,8 +1143,6 @@ def _execute_command(self, target_node, *args, **kwargs):
1127
1143
# Remove the failed node from the startup nodes before we try
1128
1144
# to reinitialize the cluster
1129
1145
self .nodes_manager .startup_nodes .pop (target_node .name , None )
1130
- # Reset the cluster node's connection
1131
- target_node .redis_connection = None
1132
1146
self .nodes_manager .initialize ()
1133
1147
raise e
1134
1148
except MovedError as e :
@@ -1148,6 +1162,13 @@ def _execute_command(self, target_node, *args, **kwargs):
1148
1162
else :
1149
1163
self .nodes_manager .update_moved_exception (e )
1150
1164
moved = True
1165
+ except SlotNotCoveredError as e :
1166
+ self .reinitialize_counter += 1
1167
+ if self ._should_reinitialized ():
1168
+ self .nodes_manager .initialize ()
1169
+ # Reset the counter
1170
+ self .reinitialize_counter = 0
1171
+ raise e
1151
1172
except TryAgainError :
1152
1173
if ttl < self .RedisClusterRequestTTL / 2 :
1153
1174
time .sleep (0.05 )
@@ -1379,7 +1400,10 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1379
1400
# randomly choose one of the replicas
1380
1401
node_idx = random .randint (1 , len (self .slots_cache [slot ]) - 1 )
1381
1402
1382
- return self .slots_cache [slot ][node_idx ]
1403
+ try :
1404
+ return self .slots_cache [slot ][node_idx ]
1405
+ except IndexError :
1406
+ return self .slots_cache [slot ][0 ]
1383
1407
1384
1408
def get_nodes_by_server_type (self , server_type ):
1385
1409
"""
@@ -1744,6 +1768,7 @@ def __init__(
1744
1768
cluster_error_retry_attempts : int = 3 ,
1745
1769
reinitialize_steps : int = 5 ,
1746
1770
lock = None ,
1771
+ retry : Optional ["Retry" ] = None ,
1747
1772
** kwargs ,
1748
1773
):
1749
1774
""" """
@@ -1769,6 +1794,7 @@ def __init__(
1769
1794
if lock is None :
1770
1795
lock = threading .Lock ()
1771
1796
self ._lock = lock
1797
+ self .retry = retry
1772
1798
1773
1799
def __repr__ (self ):
1774
1800
""" """
@@ -1901,8 +1927,9 @@ def send_cluster_commands(
1901
1927
stack ,
1902
1928
raise_on_error = raise_on_error ,
1903
1929
allow_redirections = allow_redirections ,
1930
+ attempts_count = self .cluster_error_retry_attempts - retry_attempts ,
1904
1931
)
1905
- except (ClusterDownError , ConnectionError ) as e :
1932
+ except (ClusterDownError , ConnectionError , TimeoutError ) as e :
1906
1933
if retry_attempts > 0 :
1907
1934
# Try again with the new cluster setup. All other errors
1908
1935
# should be raised.
@@ -1912,7 +1939,7 @@ def send_cluster_commands(
1912
1939
raise e
1913
1940
1914
1941
def _send_cluster_commands (
1915
- self , stack , raise_on_error = True , allow_redirections = True
1942
+ self , stack , raise_on_error = True , allow_redirections = True , attempts_count = 0
1916
1943
):
1917
1944
"""
1918
1945
Send a bunch of cluster commands to the redis cluster.
@@ -1967,9 +1994,11 @@ def _send_cluster_commands(
1967
1994
redis_node = self .get_redis_connection (node )
1968
1995
try :
1969
1996
connection = get_connection (redis_node , c .args )
1970
- except ConnectionError :
1971
- # Connection retries are being handled in the node's
1972
- # Retry object. Reinitialize the node -> slot table.
1997
+ except (ConnectionError , TimeoutError ) as e :
1998
+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1999
+ backoff = self .retry ._backoff .compute (attempts_count )
2000
+ if backoff > 0 :
2001
+ time .sleep (backoff )
1973
2002
self .nodes_manager .initialize ()
1974
2003
if is_default_node :
1975
2004
self .replace_default_node ()
0 commit comments