Skip to content

Commit 616cb31

Browse files
committed
Merge existing changes to the forked version (#1)
* [GROW-2938] do not reset redis_connection on an error * [GROW-2938] add backoff to more errors * [GROW-2938] recover from SlotNotCoveredError * [GROW-2938] prevent get_node_from_slot from failing due to concurrent cluster slots refresh * [GROW-2938] add retry to ClusterPipeline (cherry picked from commit 63e06dd)
1 parent adc5116 commit 616cb31

File tree

2 files changed

+120
-84
lines changed

2 files changed

+120
-84
lines changed

redis/cluster.py

Lines changed: 73 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,8 @@ def __init__(
584584
self.retry = retry
585585
kwargs.update({"retry": self.retry})
586586
else:
587-
kwargs.update({"retry": Retry(default_backoff(), 0)})
587+
self.retry = Retry(default_backoff(), 0)
588+
kwargs["retry"] = self.retry
588589

589590
self.encoder = Encoder(
590591
kwargs.get("encoding", "utf-8"),
@@ -767,6 +768,7 @@ def pipeline(self, transaction=None, shard_hint=None):
767768
read_from_replicas=self.read_from_replicas,
768769
reinitialize_steps=self.reinitialize_steps,
769770
lock=self._lock,
771+
retry=self.retry,
770772
)
771773

772774
def lock(
@@ -850,41 +852,49 @@ def set_response_callback(self, command, callback):
850852
def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
851853
# Determine which nodes should be executed the command on.
852854
# Returns a list of target nodes.
853-
command = args[0].upper()
854-
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
855-
command = f"{args[0]} {args[1]}".upper()
856-
857-
nodes_flag = kwargs.pop("nodes_flag", None)
858-
if nodes_flag is not None:
859-
# nodes flag passed by the user
860-
command_flag = nodes_flag
861-
else:
862-
# get the nodes group for this command if it was predefined
863-
command_flag = self.command_flags.get(command)
864-
if command_flag == self.__class__.RANDOM:
865-
# return a random node
866-
return [self.get_random_node()]
867-
elif command_flag == self.__class__.PRIMARIES:
868-
# return all primaries
869-
return self.get_primaries()
870-
elif command_flag == self.__class__.REPLICAS:
871-
# return all replicas
872-
return self.get_replicas()
873-
elif command_flag == self.__class__.ALL_NODES:
874-
# return all nodes
875-
return self.get_nodes()
876-
elif command_flag == self.__class__.DEFAULT_NODE:
877-
# return the cluster's default node
878-
return [self.nodes_manager.default_node]
879-
elif command in self.__class__.SEARCH_COMMANDS[0]:
880-
return [self.nodes_manager.default_node]
881-
else:
882-
# get the node that holds the key's slot
883-
slot = self.determine_slot(*args)
884-
node = self.nodes_manager.get_node_from_slot(
885-
slot, self.read_from_replicas and command in READ_COMMANDS
886-
)
887-
return [node]
855+
try:
856+
command = args[0].upper()
857+
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
858+
command = f"{args[0]} {args[1]}".upper()
859+
860+
nodes_flag = kwargs.pop("nodes_flag", None)
861+
if nodes_flag is not None:
862+
# nodes flag passed by the user
863+
command_flag = nodes_flag
864+
else:
865+
# get the nodes group for this command if it was predefined
866+
command_flag = self.command_flags.get(command)
867+
if command_flag == self.__class__.RANDOM:
868+
# return a random node
869+
return [self.get_random_node()]
870+
elif command_flag == self.__class__.PRIMARIES:
871+
# return all primaries
872+
return self.get_primaries()
873+
elif command_flag == self.__class__.REPLICAS:
874+
# return all replicas
875+
return self.get_replicas()
876+
elif command_flag == self.__class__.ALL_NODES:
877+
# return all nodes
878+
return self.get_nodes()
879+
elif command_flag == self.__class__.DEFAULT_NODE:
880+
# return the cluster's default node
881+
return [self.nodes_manager.default_node]
882+
elif command in self.__class__.SEARCH_COMMANDS[0]:
883+
return [self.nodes_manager.default_node]
884+
else:
885+
# get the node that holds the key's slot
886+
slot = self.determine_slot(*args)
887+
node = self.nodes_manager.get_node_from_slot(
888+
slot, self.read_from_replicas and command in READ_COMMANDS
889+
)
890+
return [node]
891+
except SlotNotCoveredError as e:
892+
self.reinitialize_counter += 1
893+
if self._should_reinitialized():
894+
self.nodes_manager.initialize()
895+
# Reset the counter
896+
self.reinitialize_counter = 0
897+
raise e
888898

889899
def _should_reinitialized(self):
890900
# To reinitialize the cluster on every MOVED error,
@@ -1076,6 +1086,12 @@ def execute_command(self, *args, **kwargs):
10761086
# The nodes and slots cache were reinitialized.
10771087
# Try again with the new cluster setup.
10781088
retry_attempts -= 1
1089+
if self.retry and isinstance(e, self.retry._supported_errors):
1090+
backoff = self.retry._backoff.compute(
1091+
self.cluster_error_retry_attempts - retry_attempts
1092+
)
1093+
if backoff > 0:
1094+
time.sleep(backoff)
10791095
continue
10801096
else:
10811097
# raise the exception
@@ -1135,8 +1151,6 @@ def _execute_command(self, target_node, *args, **kwargs):
11351151
# Remove the failed node from the startup nodes before we try
11361152
# to reinitialize the cluster
11371153
self.nodes_manager.startup_nodes.pop(target_node.name, None)
1138-
# Reset the cluster node's connection
1139-
target_node.redis_connection = None
11401154
self.nodes_manager.initialize()
11411155
raise e
11421156
except MovedError as e:
@@ -1156,6 +1170,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11561170
else:
11571171
self.nodes_manager.update_moved_exception(e)
11581172
moved = True
1173+
except SlotNotCoveredError as e:
1174+
self.reinitialize_counter += 1
1175+
if self._should_reinitialized():
1176+
self.nodes_manager.initialize()
1177+
# Reset the counter
1178+
self.reinitialize_counter = 0
1179+
raise e
11591180
except TryAgainError:
11601181
if ttl < self.RedisClusterRequestTTL / 2:
11611182
time.sleep(0.05)
@@ -1387,7 +1408,10 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
13871408
# randomly choose one of the replicas
13881409
node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
13891410

1390-
return self.slots_cache[slot][node_idx]
1411+
try:
1412+
return self.slots_cache[slot][node_idx]
1413+
except IndexError:
1414+
return self.slots_cache[slot][0]
13911415

13921416
def get_nodes_by_server_type(self, server_type):
13931417
"""
@@ -1859,6 +1883,7 @@ def __init__(
18591883
cluster_error_retry_attempts: int = 3,
18601884
reinitialize_steps: int = 5,
18611885
lock=None,
1886+
retry: Optional["Retry"] = None,
18621887
**kwargs,
18631888
):
18641889
""" """
@@ -1884,6 +1909,7 @@ def __init__(
18841909
if lock is None:
18851910
lock = threading.Lock()
18861911
self._lock = lock
1912+
self.retry = retry
18871913

18881914
def __repr__(self):
18891915
""" """
@@ -2016,8 +2042,9 @@ def send_cluster_commands(
20162042
stack,
20172043
raise_on_error=raise_on_error,
20182044
allow_redirections=allow_redirections,
2045+
attempts_count=self.cluster_error_retry_attempts - retry_attempts,
20192046
)
2020-
except (ClusterDownError, ConnectionError) as e:
2047+
except (ClusterDownError, ConnectionError, TimeoutError) as e:
20212048
if retry_attempts > 0:
20222049
# Try again with the new cluster setup. All other errors
20232050
# should be raised.
@@ -2027,7 +2054,7 @@ def send_cluster_commands(
20272054
raise e
20282055

20292056
def _send_cluster_commands(
2030-
self, stack, raise_on_error=True, allow_redirections=True
2057+
self, stack, raise_on_error=True, allow_redirections=True, attempts_count=0
20312058
):
20322059
"""
20332060
Send a bunch of cluster commands to the redis cluster.
@@ -2082,9 +2109,11 @@ def _send_cluster_commands(
20822109
redis_node = self.get_redis_connection(node)
20832110
try:
20842111
connection = get_connection(redis_node, c.args)
2085-
except ConnectionError:
2086-
# Connection retries are being handled in the node's
2087-
# Retry object. Reinitialize the node -> slot table.
2112+
except (ConnectionError, TimeoutError) as e:
2113+
if self.retry and isinstance(e, self.retry._supported_errors):
2114+
backoff = self.retry._backoff.compute(attempts_count)
2115+
if backoff > 0:
2116+
time.sleep(backoff)
20882117
self.nodes_manager.initialize()
20892118
if is_default_node:
20902119
self.replace_default_node()

tests/test_cluster.py

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import binascii
22
import datetime
3+
import uuid
34
import warnings
45
from queue import LifoQueue, Queue
56
from time import sleep
@@ -8,7 +9,12 @@
89
import pytest
910

1011
from redis import Redis
11-
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
12+
from redis.backoff import (
13+
ConstantBackoff,
14+
ExponentialBackoff,
15+
NoBackoff,
16+
default_backoff,
17+
)
1218
from redis.cluster import (
1319
PRIMARY,
1420
REDIS_CLUSTER_HASH_SLOTS,
@@ -30,6 +36,7 @@
3036
RedisClusterException,
3137
RedisError,
3238
ResponseError,
39+
SlotNotCoveredError,
3340
TimeoutError,
3441
)
3542
from redis.parsers import CommandsParser
@@ -718,45 +725,6 @@ def test_not_require_full_coverage_cluster_down_error(self, r):
718725
else:
719726
raise e
720727

721-
def test_timeout_error_topology_refresh_reuse_connections(self, r):
722-
"""
723-
By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized,
724-
and then ensure that only the impacted connection is replaced
725-
"""
726-
node = r.get_node_from_key("key")
727-
r.set("key", "value")
728-
node_conn_origin = {}
729-
for n in r.get_nodes():
730-
node_conn_origin[n.name] = n.redis_connection
731-
real_func = r.get_redis_connection(node).parse_response
732-
733-
class counter:
734-
def __init__(self, val=0):
735-
self.val = int(val)
736-
737-
count = counter(0)
738-
with patch.object(Redis, "parse_response") as parse_response:
739-
740-
def moved_redirect_effect(connection, *args, **options):
741-
# raise a timeout for 5 times so we'll need to reinitialize the topology
742-
if count.val == 4:
743-
parse_response.side_effect = real_func
744-
count.val += 1
745-
raise TimeoutError()
746-
747-
parse_response.side_effect = moved_redirect_effect
748-
assert r.get("key") == b"value"
749-
for node_name, conn in node_conn_origin.items():
750-
if node_name == node.name:
751-
# The old redis connection of the timed out node should have been
752-
# deleted and replaced
753-
assert conn != r.get_redis_connection(node)
754-
else:
755-
# other nodes' redis connection should have been reused during the
756-
# topology refresh
757-
cur_node = r.get_node(node_name=node_name)
758-
assert conn == r.get_redis_connection(cur_node)
759-
760728
def test_cluster_get_set_retry_object(self, request):
761729
retry = Retry(NoBackoff(), 2)
762730
r = _get_client(RedisCluster, request, retry=retry)
@@ -824,6 +792,45 @@ def raise_connection_error():
824792
assert "myself" not in nodes.get(curr_default_node.name).get("flags")
825793
assert r.get_default_node() != curr_default_node
826794

795+
@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
796+
def test_additional_backoff_redis_cluster(self, error):
797+
with patch.object(ConstantBackoff, "compute") as compute:
798+
799+
def _compute(target_node, *args, **kwargs):
800+
return 1
801+
802+
compute.side_effect = _compute
803+
with patch.object(RedisCluster, "_execute_command") as execute_command:
804+
805+
def raise_error(target_node, *args, **kwargs):
806+
execute_command.failed_calls += 1
807+
raise error("mocked error")
808+
809+
execute_command.side_effect = raise_error
810+
811+
rc = get_mocked_redis_client(
812+
host=default_host,
813+
port=default_port,
814+
retry=Retry(ConstantBackoff(1), 3),
815+
)
816+
817+
with pytest.raises(error):
818+
rc.get("bar")
819+
assert compute.call_count == rc.cluster_error_retry_attempts
820+
821+
@pytest.mark.parametrize("reinitialize_steps", [2, 10, 99])
822+
def test_recover_slot_not_covered_error(self, request, reinitialize_steps):
823+
rc = _get_client(RedisCluster, request, reinitialize_steps=reinitialize_steps)
824+
key = uuid.uuid4().hex
825+
826+
rc.nodes_manager.slots_cache[rc.keyslot(key)] = []
827+
828+
for _ in range(0, reinitialize_steps):
829+
with pytest.raises(SlotNotCoveredError):
830+
rc.get(key)
831+
832+
rc.get(key)
833+
827834

828835
@pytest.mark.onlycluster
829836
class TestClusterRedisCommands:

0 commit comments

Comments
 (0)