Skip to content

Overwrite rabbit_mqtt_qos0_queue record from crashed node #10203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 21, 2023

Conversation

ansd
Copy link
Member

@ansd ansd commented Dec 21, 2023

When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation removes the old rabbit_mqtt_qos0_queue record from Mnesia (via rabbit_mqtt_qos0_queue:recover/2)

However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed from Mnesia table rabbit_queue, but will still be present in table rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client ID) re-connects from the crashed node to another live node and re-subscribes, the following error occurred:

[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0>                                        <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0>                                                                                         {amqqueue,
[error] <0.43155.0>                                                                                          {resource,
[error] <0.43155.0>                                                                                           <<"/">>,
[error] <0.43155.0>                                                                                           queue,
[error] <0.43155.0>                                                                                           <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0>                                                                                          true,
[error] <0.43155.0>                                                                                          false,
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [{vhost,
[error] <0.43155.0>                                                                                            <<"/">>},
[error] <0.43155.0>                                                                                           {name,
[error] <0.43155.0>                                                                                            <<"ha-all-mqtt">>},
[error] <0.43155.0>                                                                                           {pattern,
[error] <0.43155.0>                                                                                            <<"^mqtt-">>},
[error] <0.43155.0>                                                                                           {'apply-to',
[error] <0.43155.0>                                                                                            <<"all">>},
[error] <0.43155.0>                                                                                           {definition,
[error] <0.43155.0>                                                                                            [{<<"ha-mode">>,
[error] <0.43155.0>                                                                                              <<"all">>}]},
[error] <0.43155.0>                                                                                           {priority,
[error] <0.43155.0>                                                                                            0}],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          live,
[error] <0.43155.0>                                                                                          0,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <<"/">>,
[error] <0.43155.0>                                                                                          #{user =>
[error] <0.43155.0>                                                                                             <<"iottester">>},
[error] <0.43155.0>                                                                                          rabbit_mqtt_qos0_queue,
[error] <0.43155.0>                                                                                          #{}},
[error] <0.43155.0>                                                                                         nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error

This commit fixes this error allowing an MQTT client that connects with CleanSession=true and subscribes with QoS 0 to re-connect and re-subscribe to another live node if the original Rabbit node crashes.

Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ

When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is
removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation
removes the old rabbit_mqtt_qos0_queue record from Mnesia (via
rabbit_mqtt_qos0_queue:recover/2)

However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed
from Mnesia table rabbit_queue, but will still be present in table
rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client
ID) re-connects from the crashed node to another live node and
re-subscribes, the following error occurred:
```
[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0>                                        <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0>                                                                                         {amqqueue,
[error] <0.43155.0>                                                                                          {resource,
[error] <0.43155.0>                                                                                           <<"/">>,
[error] <0.43155.0>                                                                                           queue,
[error] <0.43155.0>                                                                                           <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0>                                                                                          true,
[error] <0.43155.0>                                                                                          false,
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [{vhost,
[error] <0.43155.0>                                                                                            <<"/">>},
[error] <0.43155.0>                                                                                           {name,
[error] <0.43155.0>                                                                                            <<"ha-all-mqtt">>},
[error] <0.43155.0>                                                                                           {pattern,
[error] <0.43155.0>                                                                                            <<"^mqtt-">>},
[error] <0.43155.0>                                                                                           {'apply-to',
[error] <0.43155.0>                                                                                            <<"all">>},
[error] <0.43155.0>                                                                                           {definition,
[error] <0.43155.0>                                                                                            [{<<"ha-mode">>,
[error] <0.43155.0>                                                                                              <<"all">>}]},
[error] <0.43155.0>                                                                                           {priority,
[error] <0.43155.0>                                                                                            0}],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          live,
[error] <0.43155.0>                                                                                          0,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <<"/">>,
[error] <0.43155.0>                                                                                          #{user =>
[error] <0.43155.0>                                                                                             <<"iottester">>},
[error] <0.43155.0>                                                                                          rabbit_mqtt_qos0_queue,
[error] <0.43155.0>                                                                                          #{}},
[error] <0.43155.0>                                                                                         nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error
```

This commit fixes this error allowing an MQTT client that connects with CleanSession=true and
subscribes with QoS 0 to re-connect and re-subscribe to another live
node if the original Rabbit node crashes.

Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
@ansd ansd self-assigned this Dec 21, 2023
@ansd ansd marked this pull request as ready for review December 21, 2023 16:56
Copy link
Collaborator

@michaelklishin michaelklishin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch.

@michaelklishin michaelklishin merged commit 89e04e9 into main Dec 21, 2023
@michaelklishin michaelklishin deleted the mqtt-qos0-queue-node-down branch December 21, 2023 17:11
@michaelklishin michaelklishin added this to the 3.13.0 milestone Dec 21, 2023
@michaelklishin
Copy link
Collaborator

A backport will make it into 3.12.11.

michaelklishin added a commit that referenced this pull request Dec 21, 2023
Overwrite rabbit_mqtt_qos0_queue record from crashed node (backport #10203)
ansd added a commit that referenced this pull request Dec 27, 2023
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
michaelklishin pushed a commit that referenced this pull request Dec 28, 2023
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
mergify bot pushed a commit that referenced this pull request Dec 28, 2023
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.

(cherry picked from commit 78b4fcc)

# Conflicts:
#	deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
michaelklishin pushed a commit that referenced this pull request Jan 31, 2024
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
michaelklishin pushed a commit that referenced this pull request Feb 1, 2024
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
michaelklishin pushed a commit that referenced this pull request Feb 29, 2024
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants