-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Overwrite rabbit_mqtt_qos0_queue record from crashed node #10203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is removed from Mnesia. When a node crashes and subsequently reboots the new node incarnation removes the old rabbit_mqtt_qos0_queue record from Mnesia (via rabbit_mqtt_qos0_queue:recover/2) However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed from Mnesia table rabbit_queue, but will still be present in table rabbit_durable_queue on the other live nodes. Prior to this commit, when the same MQTT client (i.e. same MQTT client ID) re-connects from the crashed node to another live node and re-subscribes, the following error occurred: ``` [info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232 [debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic, [debug] <0.43155.0> <<"as923/gateway/+/command/#">>,0}] [error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent, [error] <0.43155.0> {amqqueue, [error] <0.43155.0> {resource, [error] <0.43155.0> <<"/">>, [error] <0.43155.0> queue, [error] <0.43155.0> <<"mqtt-subscription-nodered_24e214feb018a232qos0">>}, [error] <0.43155.0> true, [error] <0.43155.0> false, [error] <0.43155.0> <15486.32690.0>, [error] <0.43155.0> [], [error] <0.43155.0> <15486.32690.0>, [error] <0.43155.0> [], [error] <0.43155.0> [], [error] <0.43155.0> [], [error] <0.43155.0> [{vhost, [error] <0.43155.0> <<"/">>}, [error] <0.43155.0> {name, [error] <0.43155.0> <<"ha-all-mqtt">>}, [error] <0.43155.0> {pattern, [error] <0.43155.0> <<"^mqtt-">>}, [error] <0.43155.0> {'apply-to', [error] <0.43155.0> <<"all">>}, [error] <0.43155.0> {definition, [error] <0.43155.0> [{<<"ha-mode">>, [error] <0.43155.0> <<"all">>}]}, [error] <0.43155.0> {priority, [error] <0.43155.0> 0}], [error] <0.43155.0> undefined, [error] <0.43155.0> [], [error] <0.43155.0> undefined, [error] <0.43155.0> live, [error] <0.43155.0> 0, [error] <0.43155.0> [], [error] <0.43155.0> <<"/">>, [error] <0.43155.0> #{user => [error] <0.43155.0> <<"iottester">>}, [error] <0.43155.0> rabbit_mqtt_qos0_queue, [error] <0.43155.0> #{}}, [error] <0.43155.0> nodedown} [error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error ``` This commit fixes this error allowing an MQTT client that connects with CleanSession=true and subscribes with QoS 0 to re-connect and re-subscribe to another live node if the original Rabbit node crashes. Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
michaelklishin
approved these changes
Dec 21, 2023
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent catch.
A backport will make it into |
michaelklishin
added a commit
that referenced
this pull request
Dec 21, 2023
Overwrite rabbit_mqtt_qos0_queue record from crashed node (backport #10203)
ansd
added a commit
that referenced
this pull request
Dec 27, 2023
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
michaelklishin
pushed a commit
that referenced
this pull request
Dec 28, 2023
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
mergify bot
pushed a commit
that referenced
this pull request
Dec 28, 2023
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings. (cherry picked from commit 78b4fcc) # Conflicts: # deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
michaelklishin
pushed a commit
that referenced
this pull request
Jan 31, 2024
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
michaelklishin
pushed a commit
that referenced
this pull request
Feb 1, 2024
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
michaelklishin
pushed a commit
that referenced
this pull request
Feb 29, 2024
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation removes the old rabbit_mqtt_qos0_queue record from Mnesia (via rabbit_mqtt_qos0_queue:recover/2)
However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed from Mnesia table rabbit_queue, but will still be present in table rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client ID) re-connects from the crashed node to another live node and re-subscribes, the following error occurred:
This commit fixes this error allowing an MQTT client that connects with CleanSession=true and subscribes with QoS 0 to re-connect and re-subscribe to another live node if the original Rabbit node crashes.
Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ