Skip to content

Commit eb1da36

Browse files
committed
Allow MQTT QoS 0 subscribers to reconnect
The solution in #10203 has the following issues: 1. Bindings can be left ofter in Mnesia table rabbit_durable_queue. One solution to 1. would be to first delete the old queue via `rabbit_amqqueue:internal_delete(Q, User, missing_owner)` and subsequently declare the new queue via `rabbit_amqqueue:internal_declare(Q, false)` However, even then, it suffers from: 2. Race conditions between `rabbit_amqqueue:on_node_down/1` and `rabbit_mqtt_qos0_queue:declare/2`: `rabbit_amqqueue:on_node_down/1` could first read the queue records that need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1` could subsequently delete the re-created queue. Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete transient queues in one isolated transaction. Instead it first reads queues and subsequenlty deletes queues in batches making it prone to race conditions. Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the node that has crashed including their bindings. However, doing so in one transaction is risky as there may be millions of such queues and the current code path applies the same logic on all live nodes resulting in conflicting transactions and therefore a long database operation. Hence, this commit uses the simplest approach which should still be safe: Do not remove rabbit_mqtt_qos0_queue queues if a node crashes. Other live nodes will continue to route to these dead queues. That should be okay, given that the rabbit_mqtt_qos0_queue clients auto confirm. Continuing routing however has the effect of counting as routing result for AMQP 0.9.1 `mandatory` property. If an MQTT client re-connects to a live node with the same client ID, the new node will delete and then re-create the queue. Once the crashed node comes back online, it will clean up its leftover queues and bindings.
1 parent 82e25af commit eb1da36

File tree

3 files changed

+28
-32
lines changed

3 files changed

+28
-32
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1995,7 +1995,8 @@ on_node_down(Node) ->
19951995
{QueueNames, Deletions} ->
19961996
case length(QueueNames) of
19971997
0 -> ok;
1998-
_ -> rabbit_log:info("~tp transient queues from an old incarnation of node ~tp deleted in ~fs", [length(QueueNames), Node, Time/1000000])
1998+
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
1999+
[N, Node, Time / 1_000_000])
19992000
end,
20002001
notify_queue_binding_deletions(Deletions),
20012002
rabbit_core_metrics:queues_deleted(QueueNames),
@@ -2010,6 +2011,7 @@ filter_transient_queues_to_delete(Node) ->
20102011
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
20112012
andalso (not rabbit_amqqueue:is_replicated(Q)
20122013
orelse rabbit_amqqueue:is_dead_exclusive(Q))
2014+
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
20132015
end.
20142016

20152017
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ is_stateful() ->
6868
false.
6969

7070
-spec declare(amqqueue:amqqueue(), node()) ->
71-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
71+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
72+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
7273
declare(Q0, _Node) ->
7374
Q1 = case amqqueue:get_pid(Q0) of
7475
none ->
@@ -92,20 +93,6 @@ declare(Q0, _Node) ->
9293
{arguments, amqqueue:get_arguments(Q)},
9394
{user_who_performed_action, ActingUser}]),
9495
{new, Q};
95-
{absent, OldQ, nodedown} ->
96-
%% This case body can be deleted once Mnesia is unsupported.
97-
OldPid = amqqueue:get_pid(OldQ),
98-
OldNode = node(OldPid),
99-
rabbit_log_queue:debug(
100-
"Overwriting record of ~s of type ~s on node ~s since "
101-
"formerly hosting node ~s seems to be down (former pid ~p)",
102-
[rabbit_misc:rs(amqqueue:get_name(Q1)), ?MODULE, node(), OldNode, OldPid]),
103-
case rabbit_amqqueue:internal_declare(Q1, true) of
104-
{created, Q} ->
105-
{new, Q};
106-
Other ->
107-
Other
108-
end;
10996
Other ->
11097
Other
11198
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,38 +1214,45 @@ rabbit_mqtt_qos0_queue(Config) ->
12141214
ok = emqtt:disconnect(Pub).
12151215

12161216
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1217-
Topic = atom_to_binary(?FUNCTION_NAME),
1217+
Topic1 = <<"t/1">>,
1218+
Topic2 = <<"t/2">>,
12181219
Pub = connect(<<"publisher">>, Config, 2, []),
12191220

12201221
SubscriberId = <<"subscriber">>,
12211222
Sub0 = connect(SubscriberId, Config, 0, []),
1222-
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1223-
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1224-
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1223+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
1224+
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
1225+
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
12251226

12261227
process_flag(trap_exit, true),
12271228
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1228-
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1229-
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1229+
ok = await_exit(Sub0),
1230+
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
12301231
timer:sleep(500),
1232+
%% Re-connect to a live node with same MQTT client ID.
12311233
Sub1 = connect(SubscriberId, Config, 1, []),
1232-
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1233-
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1234-
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1234+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
1235+
ok = emqtt:publish(Pub, Topic2, <<"m1">>, qos0),
1236+
ok = expect_publishes(Sub1, Topic2, [<<"m1">>]),
1237+
%% Since we started a new clean session, previous subscription should have been deleted.
1238+
ok = emqtt:publish(Pub, Topic1, <<"m2">>, qos0),
1239+
receive {publish, _} = Publish -> ct:fail({unexpected, Publish})
1240+
after 300 -> ok
1241+
end,
12351242

1236-
%% Start node 0 to have a majority for Khepri.
12371243
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
12381244
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1239-
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1240-
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1245+
%% This time, do not wait.
1246+
%% rabbit_amqqueue:on_node_down/1 may or may not have run.
12411247
Sub2 = connect(SubscriberId, Config, 2, []),
1242-
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1243-
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1244-
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1248+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic2, qos0),
1249+
ok = emqtt:publish(Pub, Topic2, <<"m3">>, qos0),
1250+
ok = expect_publishes(Sub2, Topic2, [<<"m3">>]),
12451251

12461252
ok = emqtt:disconnect(Sub2),
12471253
ok = emqtt:disconnect(Pub),
1248-
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1254+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
1255+
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
12491256

12501257
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
12511258
management_plugin_connection(Config) ->

0 commit comments

Comments
 (0)