Skip to content

Commit 41016ff

Browse files
commands/cluster: use pipeline to execute split commands
- allow passing target_nodes to pipeline commands - move READ_COMMANDS to commands/cluster to avoid import cycle - add types to list_or_args
1 parent bedf3c8 commit 41016ff

File tree

5 files changed

+151
-153
lines changed

5 files changed

+151
-153
lines changed

redis/asyncio/cluster.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from redis.cluster import (
2424
PIPELINE_BLOCKED_COMMANDS,
2525
PRIMARY,
26-
READ_COMMANDS,
2726
REPLICA,
2827
SLOT_ID,
2928
AbstractRedisCluster,
@@ -32,7 +31,7 @@
3231
get_node_name,
3332
parse_cluster_slots,
3433
)
35-
from redis.commands import AsyncRedisClusterCommands
34+
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
3635
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
3736
from redis.exceptions import (
3837
AskError,
@@ -1350,11 +1349,17 @@ async def _execute(
13501349

13511350
nodes = {}
13521351
for cmd in todo:
1353-
target_nodes = await client._determine_nodes(*cmd.args)
1354-
if not target_nodes:
1355-
raise RedisClusterException(
1356-
f"No targets were found to execute {cmd.args} command on"
1352+
passed_targets = cmd.kwargs.pop("target_nodes", None)
1353+
if passed_targets and not client._is_node_flag(passed_targets):
1354+
target_nodes = client._parse_target_nodes(passed_targets)
1355+
else:
1356+
target_nodes = await client._determine_nodes(
1357+
*cmd.args, node_flag=passed_targets
13571358
)
1359+
if not target_nodes:
1360+
raise RedisClusterException(
1361+
f"No targets were found to execute {cmd.args} command on"
1362+
)
13581363
if len(target_nodes) > 1:
13591364
raise RedisClusterException(f"Too many targets for command {cmd.args}")
13601365

redis/cluster.py

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from typing import Any, Callable, Dict, Tuple
1010

1111
from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
12-
from redis.commands import CommandsParser, RedisClusterCommands
12+
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
1313
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
1414
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
1515
from redis.exceptions import (
@@ -153,52 +153,6 @@ def parse_cluster_shards(resp, **options):
153153
)
154154
KWARGS_DISABLED_KEYS = ("host", "port")
155155

156-
# Not complete, but covers the major ones
157-
# https://redis.io/commands
158-
READ_COMMANDS = frozenset(
159-
[
160-
"BITCOUNT",
161-
"BITPOS",
162-
"EXISTS",
163-
"GEODIST",
164-
"GEOHASH",
165-
"GEOPOS",
166-
"GEORADIUS",
167-
"GEORADIUSBYMEMBER",
168-
"GET",
169-
"GETBIT",
170-
"GETRANGE",
171-
"HEXISTS",
172-
"HGET",
173-
"HGETALL",
174-
"HKEYS",
175-
"HLEN",
176-
"HMGET",
177-
"HSTRLEN",
178-
"HVALS",
179-
"KEYS",
180-
"LINDEX",
181-
"LLEN",
182-
"LRANGE",
183-
"MGET",
184-
"PTTL",
185-
"RANDOMKEY",
186-
"SCARD",
187-
"SDIFF",
188-
"SINTER",
189-
"SISMEMBER",
190-
"SMEMBERS",
191-
"SRANDMEMBER",
192-
"STRLEN",
193-
"SUNION",
194-
"TTL",
195-
"ZCARD",
196-
"ZCOUNT",
197-
"ZRANGE",
198-
"ZSCORE",
199-
]
200-
)
201-
202156

203157
def cleanup_kwargs(**kwargs):
204158
"""
@@ -1965,14 +1919,25 @@ def _send_cluster_commands(
19651919
# refer to our internal node -> slot table that
19661920
# tells us where a given
19671921
# command should route to.
1968-
node = self._determine_nodes(*c.args)
1922+
passed_targets = c.options.pop("target_nodes", None)
1923+
if passed_targets and not self._is_nodes_flag(passed_targets):
1924+
target_nodes = self._parse_target_nodes(passed_targets)
1925+
else:
1926+
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
1927+
if not target_nodes:
1928+
raise RedisClusterException(
1929+
f"No targets were found to execute {c.args} command on"
1930+
)
1931+
if len(target_nodes) > 1:
1932+
raise RedisClusterException(f"Too many targets for command {c.args}")
19691933

1934+
node = target_nodes[0]
19701935
# now that we know the name of the node
19711936
# ( it's just a string in the form of host:port )
19721937
# we can build a list of commands for each node.
1973-
node_name = node[0].name
1938+
node_name = node.name
19741939
if node_name not in nodes:
1975-
redis_node = self.get_redis_connection(node[0])
1940+
redis_node = self.get_redis_connection(node)
19761941
connection = get_connection(redis_node, c.args)
19771942
nodes[node_name] = NodeCommands(
19781943
redis_node.parse_response, redis_node.connection_pool, connection

redis/commands/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
from .cluster import AsyncRedisClusterCommands, RedisClusterCommands
1+
from .cluster import READ_COMMANDS, AsyncRedisClusterCommands, RedisClusterCommands
22
from .core import AsyncCoreCommands, CoreCommands
33
from .helpers import list_or_args
44
from .parser import CommandsParser
55
from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
66
from .sentinel import AsyncSentinelCommands, SentinelCommands
77

88
__all__ = [
9+
"AsyncCoreCommands",
910
"AsyncRedisClusterCommands",
10-
"RedisClusterCommands",
11+
"AsyncRedisModuleCommands",
12+
"AsyncSentinelCommands",
1113
"CommandsParser",
12-
"AsyncCoreCommands",
1314
"CoreCommands",
14-
"list_or_args",
15-
"AsyncRedisModuleCommands",
15+
"READ_COMMANDS",
16+
"RedisClusterCommands",
1617
"RedisModuleCommands",
17-
"AsyncSentinelCommands",
1818
"SentinelCommands",
19+
"list_or_args",
1920
]

0 commit comments

Comments
 (0)