Skip to content

Commit 6ccf553

Browse files
committed
[GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline (#8)
* [GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline * [GROW-3247] fix style issue * unassign n.connection at every loop
1 parent 8a5dada commit 6ccf553

File tree

2 files changed

+185
-146
lines changed

2 files changed

+185
-146
lines changed

redis/cluster.py

Lines changed: 162 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1965,158 +1965,174 @@ def _send_cluster_commands(
19651965
# build a list of node objects based on node names we need to
19661966
nodes = {}
19671967

1968-
# as we move through each command that still needs to be processed,
1969-
# we figure out the slot number that command maps to, then from
1970-
# the slot determine the node.
1971-
for c in attempt:
1972-
while True:
1973-
# refer to our internal node -> slot table that
1974-
# tells us where a given command should route to.
1975-
# (it might be possible we have a cached node that no longer
1976-
# exists in the cluster, which is why we do this in a loop)
1977-
passed_targets = c.options.pop("target_nodes", None)
1978-
if passed_targets and not self._is_nodes_flag(passed_targets):
1979-
target_nodes = self._parse_target_nodes(passed_targets)
1980-
else:
1981-
target_nodes = self._determine_nodes(
1982-
*c.args, node_flag=passed_targets
1983-
)
1984-
if not target_nodes:
1968+
try:
1969+
# as we move through each command that still needs to be processed,
1970+
# we figure out the slot number that command maps to, then from
1971+
# the slot determine the node.
1972+
for c in attempt:
1973+
while True:
1974+
# refer to our internal node -> slot table that
1975+
# tells us where a given command should route to.
1976+
# (it might be possible we have a cached node that no longer
1977+
# exists in the cluster, which is why we do this in a loop)
1978+
passed_targets = c.options.pop("target_nodes", None)
1979+
if passed_targets and not self._is_nodes_flag(passed_targets):
1980+
target_nodes = self._parse_target_nodes(passed_targets)
1981+
else:
1982+
target_nodes = self._determine_nodes(
1983+
*c.args, node_flag=passed_targets
1984+
)
1985+
if not target_nodes:
1986+
raise RedisClusterException(
1987+
f"No targets were found to execute {c.args} command on"
1988+
)
1989+
if len(target_nodes) > 1:
19851990
raise RedisClusterException(
1986-
f"No targets were found to execute {c.args} command on"
1991+
f"Too many targets for command {c.args}"
19871992
)
1988-
if len(target_nodes) > 1:
1989-
raise RedisClusterException(
1990-
f"Too many targets for command {c.args}"
1991-
)
19921993

1993-
node = target_nodes[0]
1994-
if node == self.get_default_node():
1995-
is_default_node = True
1994+
node = target_nodes[0]
1995+
if node == self.get_default_node():
1996+
is_default_node = True
19961997

1997-
# now that we know the name of the node
1998-
# ( it's just a string in the form of host:port )
1999-
# we can build a list of commands for each node.
2000-
node_name = node.name
2001-
if node_name not in nodes:
2002-
redis_node = self.get_redis_connection(node)
1998+
# now that we know the name of the node
1999+
# ( it's just a string in the form of host:port )
2000+
# we can build a list of commands for each node.
2001+
node_name = node.name
2002+
if node_name not in nodes:
2003+
redis_node = self.get_redis_connection(node)
2004+
try:
2005+
connection = get_connection(redis_node, c.args)
2006+
except (ConnectionError, TimeoutError) as e:
2007+
for n in nodes.values():
2008+
n.connection_pool.release(n.connection)
2009+
n.connection = None
2010+
nodes = {}
2011+
if self.retry and isinstance(
2012+
e, self.retry._supported_errors
2013+
):
2014+
backoff = self.retry._backoff.compute(attempts_count)
2015+
if backoff > 0:
2016+
time.sleep(backoff)
2017+
self.nodes_manager.initialize()
2018+
if is_default_node:
2019+
self.replace_default_node()
2020+
raise
2021+
nodes[node_name] = NodeCommands(
2022+
redis_node.parse_response,
2023+
redis_node.connection_pool,
2024+
connection,
2025+
)
2026+
nodes[node_name].append(c)
2027+
break
2028+
2029+
# send the commands in sequence.
2030+
# we write to all the open sockets for each node first,
2031+
# before reading anything
2032+
# this allows us to flush all the requests out across the
2033+
# network essentially in parallel
2034+
# so that we can read them all in parallel as they come back.
2035+
# we dont' multiplex on the sockets as they come available,
2036+
# but that shouldn't make too much difference.
2037+
node_commands = nodes.values()
2038+
for n in node_commands:
2039+
n.write()
2040+
2041+
for n in node_commands:
2042+
n.read()
2043+
2044+
# release all of the redis connections we allocated earlier
2045+
# back into the connection pool.
2046+
# we used to do this step as part of a try/finally block,
2047+
# but it is really dangerous to
2048+
# release connections back into the pool if for some
2049+
# reason the socket has data still left in it
2050+
# from a previous operation. The write and
2051+
# read operations already have try/catch around them for
2052+
# all known types of errors including connection
2053+
# and socket level errors.
2054+
# So if we hit an exception, something really bad
2055+
# happened and putting any oF
2056+
# these connections back into the pool is a very bad idea.
2057+
# the socket might have unread buffer still sitting in it,
2058+
# and then the next time we read from it we pass the
2059+
# buffered result back from a previous command and
2060+
# every single request after to that connection will always get
2061+
# a mismatched result.
2062+
for n in nodes.values():
2063+
n.connection_pool.release(n.connection)
2064+
n.connection = None
2065+
nodes = {}
2066+
2067+
# if the response isn't an exception it is a
2068+
# valid response from the node
2069+
# we're all done with that command, YAY!
2070+
# if we have more commands to attempt, we've run into problems.
2071+
# collect all the commands we are allowed to retry.
2072+
# (MOVED, ASK, or connection errors or timeout errors)
2073+
attempt = sorted(
2074+
(
2075+
c
2076+
for c in attempt
2077+
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2078+
),
2079+
key=lambda x: x.position,
2080+
)
2081+
if attempt and allow_redirections:
2082+
# RETRY MAGIC HAPPENS HERE!
2083+
# send these remaing commands one at a time using `execute_command`
2084+
# in the main client. This keeps our retry logic
2085+
# in one place mostly,
2086+
# and allows us to be more confident in correctness of behavior.
2087+
# at this point any speed gains from pipelining have been lost
2088+
# anyway, so we might as well make the best
2089+
# attempt to get the correct behavior.
2090+
#
2091+
# The client command will handle retries for each
2092+
# individual command sequentially as we pass each
2093+
# one into `execute_command`. Any exceptions
2094+
# that bubble out should only appear once all
2095+
# retries have been exhausted.
2096+
#
2097+
# If a lot of commands have failed, we'll be setting the
2098+
# flag to rebuild the slots table from scratch.
2099+
# So MOVED errors should correct themselves fairly quickly.
2100+
self.reinitialize_counter += 1
2101+
if self._should_reinitialized():
2102+
self.nodes_manager.initialize()
2103+
if is_default_node:
2104+
self.replace_default_node()
2105+
for c in attempt:
20032106
try:
2004-
connection = get_connection(redis_node, c.args)
2005-
except (ConnectionError, TimeoutError) as e:
2006-
for n in nodes.values():
2007-
n.connection_pool.release(n.connection)
2008-
if self.retry and isinstance(e, self.retry._supported_errors):
2009-
backoff = self.retry._backoff.compute(attempts_count)
2010-
if backoff > 0:
2011-
time.sleep(backoff)
2012-
self.nodes_manager.initialize()
2013-
if is_default_node:
2014-
self.replace_default_node()
2015-
raise
2016-
nodes[node_name] = NodeCommands(
2017-
redis_node.parse_response,
2018-
redis_node.connection_pool,
2019-
connection,
2020-
)
2021-
nodes[node_name].append(c)
2022-
break
2023-
2024-
# send the commands in sequence.
2025-
# we write to all the open sockets for each node first,
2026-
# before reading anything
2027-
# this allows us to flush all the requests out across the
2028-
# network essentially in parallel
2029-
# so that we can read them all in parallel as they come back.
2030-
# we dont' multiplex on the sockets as they come available,
2031-
# but that shouldn't make too much difference.
2032-
node_commands = nodes.values()
2033-
for n in node_commands:
2034-
n.write()
2035-
2036-
for n in node_commands:
2037-
n.read()
2038-
2039-
# release all of the redis connections we allocated earlier
2040-
# back into the connection pool.
2041-
# we used to do this step as part of a try/finally block,
2042-
# but it is really dangerous to
2043-
# release connections back into the pool if for some
2044-
# reason the socket has data still left in it
2045-
# from a previous operation. The write and
2046-
# read operations already have try/catch around them for
2047-
# all known types of errors including connection
2048-
# and socket level errors.
2049-
# So if we hit an exception, something really bad
2050-
# happened and putting any oF
2051-
# these connections back into the pool is a very bad idea.
2052-
# the socket might have unread buffer still sitting in it,
2053-
# and then the next time we read from it we pass the
2054-
# buffered result back from a previous command and
2055-
# every single request after to that connection will always get
2056-
# a mismatched result.
2057-
for n in nodes.values():
2058-
n.connection_pool.release(n.connection)
2059-
2060-
# if the response isn't an exception it is a
2061-
# valid response from the node
2062-
# we're all done with that command, YAY!
2063-
# if we have more commands to attempt, we've run into problems.
2064-
# collect all the commands we are allowed to retry.
2065-
# (MOVED, ASK, or connection errors or timeout errors)
2066-
attempt = sorted(
2067-
(
2068-
c
2069-
for c in attempt
2070-
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2071-
),
2072-
key=lambda x: x.position,
2073-
)
2074-
if attempt and allow_redirections:
2075-
# RETRY MAGIC HAPPENS HERE!
2076-
# send these remaing commands one at a time using `execute_command`
2077-
# in the main client. This keeps our retry logic
2078-
# in one place mostly,
2079-
# and allows us to be more confident in correctness of behavior.
2080-
# at this point any speed gains from pipelining have been lost
2081-
# anyway, so we might as well make the best
2082-
# attempt to get the correct behavior.
2083-
#
2084-
# The client command will handle retries for each
2085-
# individual command sequentially as we pass each
2086-
# one into `execute_command`. Any exceptions
2087-
# that bubble out should only appear once all
2088-
# retries have been exhausted.
2089-
#
2090-
# If a lot of commands have failed, we'll be setting the
2091-
# flag to rebuild the slots table from scratch.
2092-
# So MOVED errors should correct themselves fairly quickly.
2093-
self.reinitialize_counter += 1
2094-
if self._should_reinitialized():
2095-
self.nodes_manager.initialize()
2096-
if is_default_node:
2097-
self.replace_default_node()
2098-
for c in attempt:
2099-
try:
2100-
# send each command individually like we
2101-
# do in the main client.
2102-
c.result = super().execute_command(*c.args, **c.options)
2103-
except RedisError as e:
2104-
c.result = e
2105-
2106-
# turn the response back into a simple flat array that corresponds
2107-
# to the sequence of commands issued in the stack in pipeline.execute()
2108-
response = []
2109-
for c in sorted(stack, key=lambda x: x.position):
2110-
if c.args[0] in self.cluster_response_callbacks:
2111-
c.result = self.cluster_response_callbacks[c.args[0]](
2112-
c.result, **c.options
2113-
)
2114-
response.append(c.result)
2115-
2116-
if raise_on_error:
2117-
self.raise_first_error(stack)
2107+
# send each command individually like we
2108+
# do in the main client.
2109+
c.result = super().execute_command(*c.args, **c.options)
2110+
except RedisError as e:
2111+
c.result = e
21182112

2119-
return response
2113+
# turn the response back into a simple flat array that corresponds
2114+
# to the sequence of commands issued in the stack in pipeline.execute()
2115+
response = []
2116+
for c in sorted(stack, key=lambda x: x.position):
2117+
if c.args[0] in self.cluster_response_callbacks:
2118+
c.result = self.cluster_response_callbacks[c.args[0]](
2119+
c.result, **c.options
2120+
)
2121+
response.append(c.result)
2122+
2123+
if raise_on_error:
2124+
self.raise_first_error(stack)
2125+
2126+
return response
2127+
except BaseException:
2128+
# if nodes is not empty, a problem must have occurred
2129+
# since we cant guarantee the state of the connections,
2130+
# disconnect before returning it to the connection pool
2131+
for n in nodes.values():
2132+
if n.connection:
2133+
n.connection.disconnect()
2134+
n.connection_pool.release(n.connection)
2135+
raise
21202136

21212137
def _fail_on_redirect(self, allow_redirections):
21222138
""" """

tests/test_cluster.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
REPLICA,
2424
ClusterNode,
2525
LoadBalancer,
26+
NodeCommands,
2627
NodesManager,
2728
RedisCluster,
2829
get_node_name,
@@ -2654,6 +2655,28 @@ class TestClusterPipeline:
26542655
Tests for the ClusterPipeline class
26552656
"""
26562657

2658+
@pytest.mark.parametrize("function", ["write", "read"])
2659+
def test_connection_release_with_unexpected_error_in_node_commands(
2660+
self, r, function
2661+
):
2662+
"""
2663+
Test that connection is released to the pool, even with an unexpected error
2664+
"""
2665+
with patch.object(NodeCommands, function) as m:
2666+
2667+
def raise_error():
2668+
raise Exception("unexpected error")
2669+
2670+
m.side_effect = raise_error
2671+
2672+
with pytest.raises(Exception, match="unexpected error"):
2673+
r.pipeline().get("a").execute()
2674+
2675+
for cluster_node in r.nodes_manager.nodes_cache.values():
2676+
connection_pool = cluster_node.redis_connection.connection_pool
2677+
num_of_conns = len(connection_pool._available_connections)
2678+
assert num_of_conns == connection_pool._created_connections
2679+
26572680
def test_blocked_methods(self, r):
26582681
"""
26592682
Currently some method calls on a Cluster pipeline

0 commit comments

Comments
 (0)