Skip to content

Commit 761fdec

Browse files
committed
[GROW-2938] recover from SlotNotCoveredError
1 parent daa900a commit 761fdec

File tree

2 files changed

+65
-36
lines changed

2 files changed

+65
-36
lines changed

redis/cluster.py

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -842,41 +842,49 @@ def set_response_callback(self, command, callback):
842842
def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
843843
# Determine which nodes should be executed the command on.
844844
# Returns a list of target nodes.
845-
command = args[0].upper()
846-
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
847-
command = f"{args[0]} {args[1]}".upper()
848-
849-
nodes_flag = kwargs.pop("nodes_flag", None)
850-
if nodes_flag is not None:
851-
# nodes flag passed by the user
852-
command_flag = nodes_flag
853-
else:
854-
# get the nodes group for this command if it was predefined
855-
command_flag = self.command_flags.get(command)
856-
if command_flag == self.__class__.RANDOM:
857-
# return a random node
858-
return [self.get_random_node()]
859-
elif command_flag == self.__class__.PRIMARIES:
860-
# return all primaries
861-
return self.get_primaries()
862-
elif command_flag == self.__class__.REPLICAS:
863-
# return all replicas
864-
return self.get_replicas()
865-
elif command_flag == self.__class__.ALL_NODES:
866-
# return all nodes
867-
return self.get_nodes()
868-
elif command_flag == self.__class__.DEFAULT_NODE:
869-
# return the cluster's default node
870-
return [self.nodes_manager.default_node]
871-
elif command in self.__class__.SEARCH_COMMANDS[0]:
872-
return [self.nodes_manager.default_node]
873-
else:
874-
# get the node that holds the key's slot
875-
slot = self.determine_slot(*args)
876-
node = self.nodes_manager.get_node_from_slot(
877-
slot, self.read_from_replicas and command in READ_COMMANDS
878-
)
879-
return [node]
845+
try:
846+
command = args[0].upper()
847+
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
848+
command = f"{args[0]} {args[1]}".upper()
849+
850+
nodes_flag = kwargs.pop("nodes_flag", None)
851+
if nodes_flag is not None:
852+
# nodes flag passed by the user
853+
command_flag = nodes_flag
854+
else:
855+
# get the nodes group for this command if it was predefined
856+
command_flag = self.command_flags.get(command)
857+
if command_flag == self.__class__.RANDOM:
858+
# return a random node
859+
return [self.get_random_node()]
860+
elif command_flag == self.__class__.PRIMARIES:
861+
# return all primaries
862+
return self.get_primaries()
863+
elif command_flag == self.__class__.REPLICAS:
864+
# return all replicas
865+
return self.get_replicas()
866+
elif command_flag == self.__class__.ALL_NODES:
867+
# return all nodes
868+
return self.get_nodes()
869+
elif command_flag == self.__class__.DEFAULT_NODE:
870+
# return the cluster's default node
871+
return [self.nodes_manager.default_node]
872+
elif command in self.__class__.SEARCH_COMMANDS[0]:
873+
return [self.nodes_manager.default_node]
874+
else:
875+
# get the node that holds the key's slot
876+
slot = self.determine_slot(*args)
877+
node = self.nodes_manager.get_node_from_slot(
878+
slot, self.read_from_replicas and command in READ_COMMANDS
879+
)
880+
return [node]
881+
except SlotNotCoveredError as e:
882+
self.reinitialize_counter += 1
883+
if self._should_reinitialized():
884+
self.nodes_manager.initialize()
885+
# Reset the counter
886+
self.reinitialize_counter = 0
887+
raise e
880888

881889
def _should_reinitialized(self):
882890
# To reinitialize the cluster on every MOVED error,
@@ -1150,6 +1158,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11501158
else:
11511159
self.nodes_manager.update_moved_exception(e)
11521160
moved = True
1161+
except SlotNotCoveredError as e:
1162+
self.reinitialize_counter += 1
1163+
if self._should_reinitialized():
1164+
self.nodes_manager.initialize()
1165+
# Reset the counter
1166+
self.reinitialize_counter = 0
1167+
raise e
11531168
except TryAgainError:
11541169
if ttl < self.RedisClusterRequestTTL / 2:
11551170
time.sleep(0.05)

tests/test_cluster.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import binascii
22
import datetime
3+
import uuid
34
import warnings
45
from queue import LifoQueue, Queue
56
from time import sleep
@@ -31,7 +32,7 @@
3132
RedisClusterException,
3233
RedisError,
3334
ResponseError,
34-
TimeoutError,
35+
TimeoutError, SlotNotCoveredError,
3536
)
3637
from redis.retry import Retry
3738
from redis.utils import str_if_bytes
@@ -806,6 +807,19 @@ def raise_error(target_node, *args, **kwargs):
806807
rc.get("bar")
807808
assert compute.call_count == rc.cluster_error_retry_attempts
808809

810+
@pytest.mark.parametrize('reinitialize_steps', [2, 10, 99])
811+
def test_recover_slot_not_covered_error(self, request, reinitialize_steps):
812+
rc = _get_client(RedisCluster, request, reinitialize_steps=reinitialize_steps)
813+
key = uuid.uuid4().hex
814+
815+
rc.nodes_manager.slots_cache[rc.keyslot(key)] = []
816+
817+
for _ in range(0, reinitialize_steps):
818+
with pytest.raises(SlotNotCoveredError):
819+
rc.get(key)
820+
821+
rc.get(key)
822+
809823

810824
@pytest.mark.onlycluster
811825
class TestClusterRedisCommands:

0 commit comments

Comments
 (0)