Skip to content

Commit 6da8086

Browse files
authored
Reuse the old nodes' connections when a cluster topology refresh is being done (#2235)
* A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done * Fixed RedisCluster to immediately raise AuthenticationError * Updated CHANGES * Fixed cluster async bgsave test to ignore "bgsave already in progress" error * Fixed linters
1 parent 23fd327 commit 6da8086

File tree

4 files changed

+75
-14
lines changed

4 files changed

+75
-14
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
* Fix broken connection writer lock-up for asyncio (#2065)
1212
* Fix auth bug when provided with no username (#2086)
1313
* Fix missing ClusterPipeline._lock (#2189)
14+
* Fix reusing the old nodes' connections when cluster topology refresh is being done
15+
* Fix RedisCluster to immediately raise AuthenticationError without a retry
1416

1517
* 4.1.3 (Feb 8, 2022)
1618
* Fix flushdb and flushall (#1926)

redis/cluster.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
1515
from redis.exceptions import (
1616
AskError,
17+
AuthenticationError,
1718
BusyLoadingError,
1819
ClusterCrossSlotError,
1920
ClusterDownError,
@@ -1113,7 +1114,7 @@ def _execute_command(self, target_node, *args, **kwargs):
11131114
)
11141115
return response
11151116

1116-
except (RedisClusterException, BusyLoadingError) as e:
1117+
except (RedisClusterException, BusyLoadingError, AuthenticationError) as e:
11171118
log.exception(type(e))
11181119
raise
11191120
except (ConnectionError, TimeoutError) as e:
@@ -1134,6 +1135,7 @@ def _execute_command(self, target_node, *args, **kwargs):
11341135
else:
11351136
# Hard force of reinitialize of the node/slots setup
11361137
# and try again with the new setup
1138+
target_node.redis_connection = None
11371139
self.nodes_manager.initialize()
11381140
raise
11391141
except MovedError as e:
@@ -1443,6 +1445,21 @@ def create_redis_node(self, host, port, **kwargs):
14431445
r = Redis(host=host, port=port, **kwargs)
14441446
return r
14451447

1448+
def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
1449+
node_name = get_node_name(host, port)
1450+
# check if we already have this node in the tmp_nodes_cache
1451+
target_node = tmp_nodes_cache.get(node_name)
1452+
if target_node is None:
1453+
# before creating a new cluster node, check if the cluster node already
1454+
# exists in the current nodes cache and has a valid connection so we can
1455+
# reuse it
1456+
target_node = self.nodes_cache.get(node_name)
1457+
if target_node is None or target_node.redis_connection is None:
1458+
# create new cluster node for this cluster
1459+
target_node = ClusterNode(host, port, role)
1460+
1461+
return target_node
1462+
14461463
def initialize(self):
14471464
"""
14481465
Initializes the nodes cache, slots cache and redis connections.
@@ -1521,14 +1538,14 @@ def initialize(self):
15211538

15221539
for slot in cluster_slots:
15231540
primary_node = slot[2]
1524-
host = primary_node[0]
1541+
host = str_if_bytes(primary_node[0])
15251542
if host == "":
15261543
host = startup_node.host
15271544
port = int(primary_node[1])
15281545

1529-
target_node = tmp_nodes_cache.get(get_node_name(host, port))
1530-
if target_node is None:
1531-
target_node = ClusterNode(host, port, PRIMARY)
1546+
target_node = self._get_or_create_cluster_node(
1547+
host, port, PRIMARY, tmp_nodes_cache
1548+
)
15321549
# add this node to the nodes cache
15331550
tmp_nodes_cache[target_node.name] = target_node
15341551

@@ -1539,14 +1556,12 @@ def initialize(self):
15391556
replica_nodes = [slot[j] for j in range(3, len(slot))]
15401557

15411558
for replica_node in replica_nodes:
1542-
host = replica_node[0]
1559+
host = str_if_bytes(replica_node[0])
15431560
port = replica_node[1]
15441561

1545-
target_replica_node = tmp_nodes_cache.get(
1546-
get_node_name(host, port)
1562+
target_replica_node = self._get_or_create_cluster_node(
1563+
host, port, REPLICA, tmp_nodes_cache
15471564
)
1548-
if target_replica_node is None:
1549-
target_replica_node = ClusterNode(host, port, REPLICA)
15501565
tmp_slots[i].append(target_replica_node)
15511566
# add this node to the nodes cache
15521567
tmp_nodes_cache[
@@ -1598,7 +1613,7 @@ def initialize(self):
15981613
# Set the default node
15991614
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
16001615
# Populate the startup nodes with all discovered nodes
1601-
self.populate_startup_nodes(self.nodes_cache.values())
1616+
self.startup_nodes = tmp_nodes_cache
16021617
# If initialize was called after a MovedError, clear it
16031618
self._moved_exception = None
16041619

tests/test_asyncio/test_cluster.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,9 +1059,13 @@ async def test_readwrite(self) -> None:
10591059

10601060
@skip_if_redis_enterprise()
10611061
async def test_bgsave(self, r: RedisCluster) -> None:
1062-
assert await r.bgsave()
1063-
await asyncio.sleep(0.3)
1064-
assert await r.bgsave(True)
1062+
try:
1063+
assert await r.bgsave()
1064+
await asyncio.sleep(0.3)
1065+
assert await r.bgsave(True)
1066+
except ResponseError as e:
1067+
if "Background save already in progress" not in e.__str__():
1068+
raise
10651069

10661070
async def test_info(self, r: RedisCluster) -> None:
10671071
# Map keys to same slot

tests/test_cluster.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
RedisClusterException,
3030
RedisError,
3131
ResponseError,
32+
TimeoutError,
3233
)
3334
from redis.utils import str_if_bytes
3435
from tests.test_pubsub import wait_for_message
@@ -651,6 +652,45 @@ def test_not_require_full_coverage_cluster_down_error(self, r):
651652
else:
652653
raise e
653654

655+
def test_timeout_error_topology_refresh_reuse_connections(self, r):
656+
"""
657+
By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized,
658+
and then ensure that only the impacted connection is replaced
659+
"""
660+
node = r.get_node_from_key("key")
661+
r.set("key", "value")
662+
node_conn_origin = {}
663+
for n in r.get_nodes():
664+
node_conn_origin[n.name] = n.redis_connection
665+
real_func = r.get_redis_connection(node).parse_response
666+
667+
class counter:
668+
def __init__(self, val=0):
669+
self.val = int(val)
670+
671+
count = counter(0)
672+
with patch.object(Redis, "parse_response") as parse_response:
673+
674+
def moved_redirect_effect(connection, *args, **options):
675+
# raise a timeout for 5 times so we'll need to reinitilize the topology
676+
if count.val >= 5:
677+
parse_response.side_effect = real_func
678+
count.val += 1
679+
raise TimeoutError()
680+
681+
parse_response.side_effect = moved_redirect_effect
682+
assert r.get("key") == b"value"
683+
for node_name, conn in node_conn_origin.items():
684+
if node_name == node.name:
685+
# The old redis connection of the timed out node should have been
686+
# deleted and replaced
687+
assert conn != r.get_redis_connection(node)
688+
else:
689+
# other nodes' redis connection should have been reused during the
690+
# topology refresh
691+
cur_node = r.get_node(node_name=node_name)
692+
assert conn == r.get_redis_connection(cur_node)
693+
654694

655695
@pytest.mark.onlycluster
656696
class TestClusterRedisCommands:

0 commit comments

Comments
 (0)