-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Failover handling improvements for RedisCluster and Async RedisCluster #2377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
dad144b
Cluster&AsyncCluster: Removed handling of timeouts/connection errors …
barshaul 7ece39e
Fixed linters
barshaul 769f097
Type fixes
barshaul d852bc9
Added to CHANGES
barshaul 1415633
Added getter and setter for the client's retry object and added more …
barshaul c0b57ae
Fixed linters
barshaul 513f913
Fixed test
barshaul 1aa3e7c
Fixed test_client_kill test
barshaul e100923
Changed get_default_backoff to default_backoff, removed retry_on_erro…
barshaul eebf4b0
Fixing linters
barshaul 8ca002d
Reverting deletion of connection_error_retry_attempts to maintain bac…
barshaul 04dfd25
Updating retry object for existing and new connections
barshaul b43b1dc
Changed the default value of reinitialize_steps from 10 to 5
barshaul 9d1d6da
Merge branch 'master' into cluster-fixes
chayim c306b37
fix review comments
dvora-h dc9808a
Merge branch 'master' into cluster-fixes
dvora-h c19b545
Merge branch 'master' into cluster-fixes
dvora-h File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,8 @@ | |
) | ||
from redis.asyncio.lock import Lock | ||
from redis.asyncio.parser import CommandsParser | ||
from redis.asyncio.retry import Retry | ||
from redis.backoff import default_backoff | ||
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis | ||
from redis.cluster import ( | ||
PIPELINE_BLOCKED_COMMANDS, | ||
|
@@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand | |
:param startup_nodes: | ||
| :class:`~.ClusterNode` to used as a startup node | ||
:param require_full_coverage: | ||
| When set to ``False``: the client will not require a full coverage of the | ||
slots. However, if not all slots are covered, and at least one node has | ||
``cluster-require-full-coverage`` set to ``yes``, the server will throw a | ||
:class:`~.ClusterDownError` for some key-based commands. | ||
| When set to ``False``: the client will not require a full coverage of | ||
the slots. However, if not all slots are covered, and at least one node | ||
has ``cluster-require-full-coverage`` set to ``yes``, the server will throw | ||
a :class:`~.ClusterDownError` for some key-based commands. | ||
| When set to ``True``: all slots must be covered to construct the cluster | ||
client. If not all slots are covered, :class:`~.RedisClusterException` will be | ||
thrown. | ||
|
@@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand | |
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered | ||
:param connection_error_retry_attempts: | ||
| Number of times to retry before reinitializing when :class:`~.TimeoutError` | ||
or :class:`~.ConnectionError` are encountered | ||
or :class:`~.ConnectionError` are encountered. | ||
The default backoff strategy will be set if Retry object is not passed (see | ||
default_backoff in backoff.py). To change it, pass a custom Retry object | ||
using the "retry" keyword. | ||
:param max_connections: | ||
| Maximum number of connections per node. If there are no free connections & the | ||
maximum number of connections are already created, a | ||
|
@@ -214,9 +219,9 @@ def __init__( | |
startup_nodes: Optional[List["ClusterNode"]] = None, | ||
require_full_coverage: bool = True, | ||
read_from_replicas: bool = False, | ||
reinitialize_steps: int = 10, | ||
reinitialize_steps: int = 5, | ||
cluster_error_retry_attempts: int = 3, | ||
connection_error_retry_attempts: int = 5, | ||
connection_error_retry_attempts: int = 3, | ||
max_connections: int = 2**31, | ||
# Client related kwargs | ||
db: Union[str, int] = 0, | ||
|
@@ -235,6 +240,8 @@ def __init__( | |
socket_keepalive: bool = False, | ||
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, | ||
socket_timeout: Optional[float] = None, | ||
retry: Optional["Retry"] = None, | ||
retry_on_error: Optional[List[Exception]] = None, | ||
# SSL related kwargs | ||
ssl: bool = False, | ||
ssl_ca_certs: Optional[str] = None, | ||
|
@@ -282,6 +289,7 @@ def __init__( | |
"socket_keepalive": socket_keepalive, | ||
"socket_keepalive_options": socket_keepalive_options, | ||
"socket_timeout": socket_timeout, | ||
"retry": retry, | ||
} | ||
|
||
if ssl: | ||
|
@@ -302,6 +310,18 @@ def __init__( | |
# Call our on_connect function to configure READONLY mode | ||
kwargs["redis_connect_func"] = self.on_connect | ||
|
||
self.retry = retry | ||
if retry or retry_on_error or connection_error_retry_attempts > 0: | ||
chayim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Set a retry object for all cluster nodes | ||
self.retry = retry or Retry( | ||
default_backoff(), connection_error_retry_attempts | ||
) | ||
if not retry_on_error: | ||
# Default errors for retrying | ||
retry_on_error = [ConnectionError, TimeoutError] | ||
self.retry.update_supported_errors(retry_on_error) | ||
kwargs.update({"retry": self.retry}) | ||
|
||
kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() | ||
self.connection_kwargs = kwargs | ||
|
||
|
@@ -323,7 +343,6 @@ def __init__( | |
self.reinitialize_steps = reinitialize_steps | ||
self.cluster_error_retry_attempts = cluster_error_retry_attempts | ||
self.connection_error_retry_attempts = connection_error_retry_attempts | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: Don't delete this line. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't delete it, just moved it to line 309 |
||
self.reinitialize_counter = 0 | ||
self.commands_parser = CommandsParser() | ||
self.node_flags = self.__class__.NODE_FLAGS.copy() | ||
|
@@ -481,6 +500,16 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: | |
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" | ||
return self.connection_kwargs | ||
|
||
def get_retry(self) -> Optional["Retry"]: | ||
return self.retry | ||
|
||
def set_retry(self, retry: "Retry") -> None: | ||
self.retry = retry | ||
for node in self.get_nodes(): | ||
node.connection_kwargs.update({"retry": retry}) | ||
barshaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for conn in node._connections: | ||
conn.retry = retry | ||
|
||
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: | ||
"""Set a custom response callback.""" | ||
self.response_callbacks[command] = callback | ||
|
@@ -618,9 +647,11 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: | |
if passed_targets and not self._is_node_flag(passed_targets): | ||
target_nodes = self._parse_target_nodes(passed_targets) | ||
target_nodes_specified = True | ||
retry_attempts = 1 | ||
retry_attempts = 0 | ||
|
||
for _ in range(retry_attempts): | ||
# Add one for the first execution | ||
execute_attempts = 1 + retry_attempts | ||
for _ in range(execute_attempts): | ||
if self._initialize: | ||
await self.initialize() | ||
try: | ||
|
@@ -658,25 +689,21 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: | |
) | ||
return dict(zip(keys, values)) | ||
except Exception as e: | ||
if type(e) in self.__class__.ERRORS_ALLOW_RETRY: | ||
# The nodes and slots cache were reinitialized. | ||
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: | ||
# The nodes and slots cache were should be reinitialized. | ||
# Try again with the new cluster setup. | ||
exception = e | ||
retry_attempts -= 1 | ||
continue | ||
else: | ||
# All other errors should be raised. | ||
raise | ||
|
||
# If it fails the configured number of times then raise exception back | ||
# to caller of this method | ||
raise exception | ||
# raise the exception | ||
raise e | ||
|
||
async def _execute_command( | ||
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any | ||
) -> Any: | ||
asking = moved = False | ||
redirect_addr = None | ||
ttl = self.RedisClusterRequestTTL | ||
connection_error_retry_counter = 0 | ||
|
||
while ttl > 0: | ||
ttl -= 1 | ||
|
@@ -695,25 +722,18 @@ async def _execute_command( | |
moved = False | ||
|
||
return await target_node.execute_command(*args, **kwargs) | ||
except BusyLoadingError: | ||
except (BusyLoadingError, MaxConnectionsError): | ||
raise | ||
except (ConnectionError, TimeoutError): | ||
# Connection retries are being handled in the node's | ||
# Retry object. | ||
# Remove the failed node from the startup nodes before we try | ||
# to reinitialize the cluster | ||
self.nodes_manager.startup_nodes.pop(target_node.name, None) | ||
# Hard force of reinitialize of the node/slots setup | ||
# and try again with the new setup | ||
chayim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
await self.close() | ||
raise | ||
except (ConnectionError, TimeoutError) as e: | ||
# Give the node 0.25 seconds to get back up and retry again with the | ||
# same node and configuration. After the defined number of attempts, try | ||
# to reinitialize the cluster and try again. | ||
connection_error_retry_counter += 1 | ||
if ( | ||
connection_error_retry_counter | ||
< self.connection_error_retry_attempts | ||
): | ||
await asyncio.sleep(0.25) | ||
else: | ||
if isinstance(e, MaxConnectionsError): | ||
raise | ||
# Hard force of reinitialize of the node/slots setup | ||
# and try again with the new setup | ||
await self.close() | ||
raise | ||
except ClusterDownError: | ||
# ClusterDownError can occur during a failover and to get | ||
# self-healed, we will try to reinitialize the cluster layout | ||
|
@@ -1145,26 +1165,11 @@ async def initialize(self) -> None: | |
) | ||
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") | ||
startup_nodes_reachable = True | ||
except (ConnectionError, TimeoutError) as e: | ||
except Exception as e: | ||
# Try the next startup node. | ||
# The exception is saved and raised only if we have no more nodes. | ||
exception = e | ||
continue | ||
except ResponseError as e: | ||
# Isn't a cluster connection, so it won't parse these | ||
# exceptions automatically | ||
message = e.__str__() | ||
if "CLUSTERDOWN" in message or "MASTERDOWN" in message: | ||
continue | ||
else: | ||
raise RedisClusterException( | ||
'ERROR sending "cluster slots" command to redis ' | ||
f"server: {startup_node}. error: {message}" | ||
) | ||
except Exception as e: | ||
message = e.__str__() | ||
raise RedisClusterException( | ||
'ERROR sending "cluster slots" command to redis ' | ||
f"server {startup_node.name}. error: {message}" | ||
) | ||
|
||
# CLUSTER SLOTS command results in the following output: | ||
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] | ||
|
@@ -1245,8 +1250,8 @@ async def initialize(self) -> None: | |
|
||
if not startup_nodes_reachable: | ||
raise RedisClusterException( | ||
"Redis Cluster cannot be connected. Please provide at least " | ||
"one reachable node. " | ||
f"Redis Cluster cannot be connected. Please provide at least " | ||
f"one reachable node: {str(exception)}" | ||
) from exception | ||
|
||
# Check if the slots are not fully covered | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.