Skip to content

Commit 179e924

Browse files
committed
[GROW-2938] recover from SlotNotCoveredError
1 parent c2a62b2 commit 179e924

File tree

2 files changed

+65
-36
lines changed

2 files changed

+65
-36
lines changed

redis/cluster.py

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -858,41 +858,49 @@ def set_response_callback(self, command, callback):
858858
def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
859859
# Determine which nodes should be executed the command on.
860860
# Returns a list of target nodes.
861-
command = args[0].upper()
862-
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
863-
command = f"{args[0]} {args[1]}".upper()
864-
865-
nodes_flag = kwargs.pop("nodes_flag", None)
866-
if nodes_flag is not None:
867-
# nodes flag passed by the user
868-
command_flag = nodes_flag
869-
else:
870-
# get the nodes group for this command if it was predefined
871-
command_flag = self.command_flags.get(command)
872-
if command_flag == self.__class__.RANDOM:
873-
# return a random node
874-
return [self.get_random_node()]
875-
elif command_flag == self.__class__.PRIMARIES:
876-
# return all primaries
877-
return self.get_primaries()
878-
elif command_flag == self.__class__.REPLICAS:
879-
# return all replicas
880-
return self.get_replicas()
881-
elif command_flag == self.__class__.ALL_NODES:
882-
# return all nodes
883-
return self.get_nodes()
884-
elif command_flag == self.__class__.DEFAULT_NODE:
885-
# return the cluster's default node
886-
return [self.nodes_manager.default_node]
887-
elif command in self.__class__.SEARCH_COMMANDS[0]:
888-
return [self.nodes_manager.default_node]
889-
else:
890-
# get the node that holds the key's slot
891-
slot = self.determine_slot(*args)
892-
node = self.nodes_manager.get_node_from_slot(
893-
slot, self.read_from_replicas and command in READ_COMMANDS
894-
)
895-
return [node]
861+
try:
862+
command = args[0].upper()
863+
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
864+
command = f"{args[0]} {args[1]}".upper()
865+
866+
nodes_flag = kwargs.pop("nodes_flag", None)
867+
if nodes_flag is not None:
868+
# nodes flag passed by the user
869+
command_flag = nodes_flag
870+
else:
871+
# get the nodes group for this command if it was predefined
872+
command_flag = self.command_flags.get(command)
873+
if command_flag == self.__class__.RANDOM:
874+
# return a random node
875+
return [self.get_random_node()]
876+
elif command_flag == self.__class__.PRIMARIES:
877+
# return all primaries
878+
return self.get_primaries()
879+
elif command_flag == self.__class__.REPLICAS:
880+
# return all replicas
881+
return self.get_replicas()
882+
elif command_flag == self.__class__.ALL_NODES:
883+
# return all nodes
884+
return self.get_nodes()
885+
elif command_flag == self.__class__.DEFAULT_NODE:
886+
# return the cluster's default node
887+
return [self.nodes_manager.default_node]
888+
elif command in self.__class__.SEARCH_COMMANDS[0]:
889+
return [self.nodes_manager.default_node]
890+
else:
891+
# get the node that holds the key's slot
892+
slot = self.determine_slot(*args)
893+
node = self.nodes_manager.get_node_from_slot(
894+
slot, self.read_from_replicas and command in READ_COMMANDS
895+
)
896+
return [node]
897+
except SlotNotCoveredError as e:
898+
self.reinitialize_counter += 1
899+
if self._should_reinitialized():
900+
self.nodes_manager.initialize()
901+
# Reset the counter
902+
self.reinitialize_counter = 0
903+
raise e
896904

897905
def _should_reinitialized(self):
898906
# To reinitialize the cluster on every MOVED error,
@@ -1166,6 +1174,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11661174
else:
11671175
self.nodes_manager.update_moved_exception(e)
11681176
moved = True
1177+
except SlotNotCoveredError as e:
1178+
self.reinitialize_counter += 1
1179+
if self._should_reinitialized():
1180+
self.nodes_manager.initialize()
1181+
# Reset the counter
1182+
self.reinitialize_counter = 0
1183+
raise e
11691184
except TryAgainError:
11701185
if ttl < self.RedisClusterRequestTTL / 2:
11711186
time.sleep(0.05)

tests/test_cluster.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import socket
55
import socketserver
66
import threading
7+
import uuid
78
import warnings
89
from queue import LifoQueue, Queue
910
from time import sleep
@@ -35,7 +36,7 @@
3536
RedisClusterException,
3637
RedisError,
3738
ResponseError,
38-
TimeoutError,
39+
TimeoutError, SlotNotCoveredError,
3940
)
4041
from redis.retry import Retry
4142
from redis.utils import str_if_bytes
@@ -922,6 +923,19 @@ def raise_error(target_node, *args, **kwargs):
922923
rc.get("bar")
923924
assert compute.call_count == rc.cluster_error_retry_attempts
924925

926+
@pytest.mark.parametrize('reinitialize_steps', [2, 10, 99])
927+
def test_recover_slot_not_covered_error(self, request, reinitialize_steps):
928+
rc = _get_client(RedisCluster, request, reinitialize_steps=reinitialize_steps)
929+
key = uuid.uuid4().hex
930+
931+
rc.nodes_manager.slots_cache[rc.keyslot(key)] = []
932+
933+
for _ in range(0, reinitialize_steps):
934+
with pytest.raises(SlotNotCoveredError):
935+
rc.get(key)
936+
937+
rc.get(key)
938+
925939

926940
@pytest.mark.onlycluster
927941
class TestClusterRedisCommands:

0 commit comments

Comments
 (0)