Skip to content

Commit 7a36e8b

Browse files
authored
Adding load balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config) (#3563)
* Adding laod balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config) * Fixing linter errors * Changing the LoadBalancingStrategy type hints to be defined as optional. Fixed wording in pydocs * Adding integration tests with the different load balancing strategies for read operation * Fixing linters
1 parent 6317ef5 commit 7a36e8b

File tree

5 files changed

+400
-43
lines changed

5 files changed

+400
-43
lines changed

redis/asyncio/cluster.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
SLOT_ID,
3939
AbstractRedisCluster,
4040
LoadBalancer,
41+
LoadBalancingStrategy,
4142
block_pipeline_command,
4243
get_node_name,
4344
parse_cluster_slots,
@@ -65,6 +66,7 @@
6566
from redis.typing import AnyKeyT, EncodableT, KeyT
6667
from redis.utils import (
6768
SSL_AVAILABLE,
69+
deprecated_args,
6870
deprecated_function,
6971
get_lib_version,
7072
safe_str,
@@ -121,9 +123,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
121123
| See:
122124
https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
123125
:param read_from_replicas:
124-
| Enable read from replicas in READONLY mode. You can read possibly stale data.
126+
| @deprecated - please use load_balancing_strategy instead
127+
| Enable read from replicas in READONLY mode.
125128
When set to true, read commands will be assigned between the primary and
126129
its replications in a Round-Robin manner.
130+
The data read from replicas is eventually consistent with the data in primary nodes.
131+
:param load_balancing_strategy:
132+
| Enable read from replicas in READONLY mode and defines the load balancing
133+
strategy that will be used for cluster node selection.
134+
The data read from replicas is eventually consistent with the data in primary nodes.
127135
:param reinitialize_steps:
128136
| Specifies the number of MOVED errors that need to occur before reinitializing
129137
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -216,6 +224,11 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
216224
"result_callbacks",
217225
)
218226

227+
@deprecated_args(
228+
args_to_warn=["read_from_replicas"],
229+
reason="Please configure the 'load_balancing_strategy' instead",
230+
version="5.0.3",
231+
)
219232
def __init__(
220233
self,
221234
host: Optional[str] = None,
@@ -224,6 +237,7 @@ def __init__(
224237
startup_nodes: Optional[List["ClusterNode"]] = None,
225238
require_full_coverage: bool = True,
226239
read_from_replicas: bool = False,
240+
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
227241
reinitialize_steps: int = 5,
228242
cluster_error_retry_attempts: int = 3,
229243
connection_error_retry_attempts: int = 3,
@@ -322,7 +336,7 @@ def __init__(
322336
}
323337
)
324338

325-
if read_from_replicas:
339+
if read_from_replicas or load_balancing_strategy:
326340
# Call our on_connect function to configure READONLY mode
327341
kwargs["redis_connect_func"] = self.on_connect
328342

@@ -371,6 +385,7 @@ def __init__(
371385
)
372386
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
373387
self.read_from_replicas = read_from_replicas
388+
self.load_balancing_strategy = load_balancing_strategy
374389
self.reinitialize_steps = reinitialize_steps
375390
self.cluster_error_retry_attempts = cluster_error_retry_attempts
376391
self.connection_error_retry_attempts = connection_error_retry_attempts
@@ -589,6 +604,7 @@ async def _determine_nodes(
589604
self.nodes_manager.get_node_from_slot(
590605
await self._determine_slot(command, *args),
591606
self.read_from_replicas and command in READ_COMMANDS,
607+
self.load_balancing_strategy if command in READ_COMMANDS else None,
592608
)
593609
]
594610

@@ -769,7 +785,11 @@ async def _execute_command(
769785
# refresh the target node
770786
slot = await self._determine_slot(*args)
771787
target_node = self.nodes_manager.get_node_from_slot(
772-
slot, self.read_from_replicas and args[0] in READ_COMMANDS
788+
slot,
789+
self.read_from_replicas and args[0] in READ_COMMANDS,
790+
self.load_balancing_strategy
791+
if args[0] in READ_COMMANDS
792+
else None,
773793
)
774794
moved = False
775795

@@ -1231,17 +1251,23 @@ def _update_moved_slots(self) -> None:
12311251
self._moved_exception = None
12321252

12331253
def get_node_from_slot(
1234-
self, slot: int, read_from_replicas: bool = False
1254+
self,
1255+
slot: int,
1256+
read_from_replicas: bool = False,
1257+
load_balancing_strategy=None,
12351258
) -> "ClusterNode":
12361259
if self._moved_exception:
12371260
self._update_moved_slots()
12381261

1262+
if read_from_replicas is True and load_balancing_strategy is None:
1263+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1264+
12391265
try:
1240-
if read_from_replicas:
1241-
# get the server index in a Round-Robin manner
1266+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1267+
# get the server index using the strategy defined in load_balancing_strategy
12421268
primary_name = self.slots_cache[slot][0].name
12431269
node_idx = self.read_load_balancer.get_server_index(
1244-
primary_name, len(self.slots_cache[slot])
1270+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
12451271
)
12461272
return self.slots_cache[slot][node_idx]
12471273
return self.slots_cache[slot][0]

redis/cluster.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import time
66
from collections import OrderedDict
7+
from enum import Enum
78
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
89

910
from redis._parsers import CommandsParser, Encoder
@@ -482,6 +483,11 @@ class initializer. In the case of conflicting arguments, querystring
482483
"""
483484
return cls(url=url, **kwargs)
484485

486+
@deprecated_args(
487+
args_to_warn=["read_from_replicas"],
488+
reason="Please configure the 'load_balancing_strategy' instead",
489+
version="5.0.3",
490+
)
485491
def __init__(
486492
self,
487493
host: Optional[str] = None,
@@ -492,6 +498,7 @@ def __init__(
492498
require_full_coverage: bool = False,
493499
reinitialize_steps: int = 5,
494500
read_from_replicas: bool = False,
501+
load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
495502
dynamic_startup_nodes: bool = True,
496503
url: Optional[str] = None,
497504
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
@@ -520,11 +527,16 @@ def __init__(
520527
cluster client. If not all slots are covered, RedisClusterException
521528
will be thrown.
522529
:param read_from_replicas:
530+
@deprecated - please use load_balancing_strategy instead
523531
Enable read from replicas in READONLY mode. You can read possibly
524532
stale data.
525533
When set to true, read commands will be assigned between the
526534
primary and its replications in a Round-Robin manner.
527-
:param dynamic_startup_nodes:
535+
:param load_balancing_strategy:
536+
Enable read from replicas in READONLY mode and defines the load balancing
537+
strategy that will be used for cluster node selection.
538+
The data read from replicas is eventually consistent with the data in primary nodes.
539+
:param dynamic_startup_nodes:
528540
Set the RedisCluster's startup nodes to all of the discovered nodes.
529541
If true (default value), the cluster's discovered nodes will be used to
530542
determine the cluster nodes-slots mapping in the next topology refresh.
@@ -629,6 +641,7 @@ def __init__(
629641
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
630642
self.node_flags = self.__class__.NODE_FLAGS.copy()
631643
self.read_from_replicas = read_from_replicas
644+
self.load_balancing_strategy = load_balancing_strategy
632645
self.reinitialize_counter = 0
633646
self.reinitialize_steps = reinitialize_steps
634647
if event_dispatcher is None:
@@ -683,7 +696,7 @@ def on_connect(self, connection):
683696
"""
684697
connection.on_connect()
685698

686-
if self.read_from_replicas:
699+
if self.read_from_replicas or self.load_balancing_strategy:
687700
# Sending READONLY command to server to configure connection as
688701
# readonly. Since each cluster node may change its server type due
689702
# to a failover, we should establish a READONLY connection
@@ -810,6 +823,7 @@ def pipeline(self, transaction=None, shard_hint=None):
810823
cluster_response_callbacks=self.cluster_response_callbacks,
811824
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
812825
read_from_replicas=self.read_from_replicas,
826+
load_balancing_strategy=self.load_balancing_strategy,
813827
reinitialize_steps=self.reinitialize_steps,
814828
lock=self._lock,
815829
)
@@ -934,7 +948,9 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
934948
# get the node that holds the key's slot
935949
slot = self.determine_slot(*args)
936950
node = self.nodes_manager.get_node_from_slot(
937-
slot, self.read_from_replicas and command in READ_COMMANDS
951+
slot,
952+
self.read_from_replicas and command in READ_COMMANDS,
953+
self.load_balancing_strategy if command in READ_COMMANDS else None,
938954
)
939955
return [node]
940956

@@ -1158,7 +1174,11 @@ def _execute_command(self, target_node, *args, **kwargs):
11581174
# refresh the target node
11591175
slot = self.determine_slot(*args)
11601176
target_node = self.nodes_manager.get_node_from_slot(
1161-
slot, self.read_from_replicas and command in READ_COMMANDS
1177+
slot,
1178+
self.read_from_replicas and command in READ_COMMANDS,
1179+
self.load_balancing_strategy
1180+
if command in READ_COMMANDS
1181+
else None,
11621182
)
11631183
moved = False
11641184

@@ -1307,6 +1327,12 @@ def __del__(self):
13071327
self.redis_connection.close()
13081328

13091329

1330+
class LoadBalancingStrategy(Enum):
1331+
ROUND_ROBIN = "round_robin"
1332+
ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1333+
RANDOM_REPLICA = "random_replica"
1334+
1335+
13101336
class LoadBalancer:
13111337
"""
13121338
Round-Robin Load Balancing
@@ -1316,15 +1342,38 @@ def __init__(self, start_index: int = 0) -> None:
13161342
self.primary_to_idx = {}
13171343
self.start_index = start_index
13181344

1319-
def get_server_index(self, primary: str, list_size: int) -> int:
1320-
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1321-
# Update the index
1322-
self.primary_to_idx[primary] = (server_index + 1) % list_size
1323-
return server_index
1345+
def get_server_index(
1346+
self,
1347+
primary: str,
1348+
list_size: int,
1349+
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1350+
) -> int:
1351+
if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1352+
return self._get_random_replica_index(list_size)
1353+
else:
1354+
return self._get_round_robin_index(
1355+
primary,
1356+
list_size,
1357+
load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1358+
)
13241359

13251360
def reset(self) -> None:
13261361
self.primary_to_idx.clear()
13271362

1363+
def _get_random_replica_index(self, list_size: int) -> int:
1364+
return random.randint(1, list_size - 1)
1365+
1366+
def _get_round_robin_index(
1367+
self, primary: str, list_size: int, replicas_only: bool
1368+
) -> int:
1369+
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1370+
if replicas_only and server_index == 0:
1371+
# skip the primary node index
1372+
server_index = 1
1373+
# Update the index for the next round
1374+
self.primary_to_idx[primary] = (server_index + 1) % list_size
1375+
return server_index
1376+
13281377

13291378
class NodesManager:
13301379
def __init__(
@@ -1428,7 +1477,21 @@ def _update_moved_slots(self):
14281477
# Reset moved_exception
14291478
self._moved_exception = None
14301479

1431-
def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1480+
@deprecated_args(
1481+
args_to_warn=["server_type"],
1482+
reason=(
1483+
"In case you need select some load balancing strategy "
1484+
"that will use replicas, please set it through 'load_balancing_strategy'"
1485+
),
1486+
version="5.0.3",
1487+
)
1488+
def get_node_from_slot(
1489+
self,
1490+
slot,
1491+
read_from_replicas=False,
1492+
load_balancing_strategy=None,
1493+
server_type=None,
1494+
):
14321495
"""
14331496
Gets a node that servers this hash slot
14341497
"""
@@ -1443,11 +1506,14 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
14431506
f'"require_full_coverage={self._require_full_coverage}"'
14441507
)
14451508

1446-
if read_from_replicas is True:
1447-
# get the server index in a Round-Robin manner
1509+
if read_from_replicas is True and load_balancing_strategy is None:
1510+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1511+
1512+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1513+
# get the server index using the strategy defined in load_balancing_strategy
14481514
primary_name = self.slots_cache[slot][0].name
14491515
node_idx = self.read_load_balancer.get_server_index(
1450-
primary_name, len(self.slots_cache[slot])
1516+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
14511517
)
14521518
elif (
14531519
server_type is None
@@ -1730,7 +1796,7 @@ def __init__(
17301796
first command execution. The node will be determined by:
17311797
1. Hashing the channel name in the request to find its keyslot
17321798
2. Selecting a node that handles the keyslot: If read_from_replicas is
1733-
set to true, a replica can be selected.
1799+
set to true or load_balancing_strategy is set, a replica can be selected.
17341800
17351801
:type redis_cluster: RedisCluster
17361802
:type node: ClusterNode
@@ -1826,7 +1892,9 @@ def execute_command(self, *args):
18261892
channel = args[1]
18271893
slot = self.cluster.keyslot(channel)
18281894
node = self.cluster.nodes_manager.get_node_from_slot(
1829-
slot, self.cluster.read_from_replicas
1895+
slot,
1896+
self.cluster.read_from_replicas,
1897+
self.cluster.load_balancing_strategy,
18301898
)
18311899
else:
18321900
# Get a random node
@@ -1969,6 +2037,7 @@ def __init__(
19692037
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
19702038
startup_nodes: Optional[List["ClusterNode"]] = None,
19712039
read_from_replicas: bool = False,
2040+
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
19722041
cluster_error_retry_attempts: int = 3,
19732042
reinitialize_steps: int = 5,
19742043
lock=None,
@@ -1984,6 +2053,7 @@ def __init__(
19842053
)
19852054
self.startup_nodes = startup_nodes if startup_nodes else []
19862055
self.read_from_replicas = read_from_replicas
2056+
self.load_balancing_strategy = load_balancing_strategy
19872057
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
19882058
self.cluster_response_callbacks = cluster_response_callbacks
19892059
self.cluster_error_retry_attempts = cluster_error_retry_attempts

0 commit comments

Comments
 (0)