Skip to content

Commit 91b2964

Browse files
Merge pull request #10252 from rabbitmq/mergify/bp/v3.12.x/pr-10244
Allow MQTT QoS 0 subscribers to reconnect (backport #10244)
2 parents e7c98a8 + 9ff53e9 commit 91b2964

File tree

3 files changed

+28
-32
lines changed

3 files changed

+28
-32
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1972,7 +1972,8 @@ on_node_down(Node) ->
19721972
{QueueNames, Deletions} ->
19731973
case length(QueueNames) of
19741974
0 -> ok;
1975-
_ -> rabbit_log:info("~tp transient queues from an old incarnation of node ~tp deleted in ~fs", [length(QueueNames), Node, Time/1000000])
1975+
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
1976+
[N, Node, Time / 1_000_000])
19761977
end,
19771978
notify_queue_binding_deletions(Deletions),
19781979
rabbit_core_metrics:queues_deleted(QueueNames),
@@ -1987,6 +1988,7 @@ filter_transient_queues_to_delete(Node) ->
19871988
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
19881989
andalso (not rabbit_amqqueue:is_replicated(Q)
19891990
orelse rabbit_amqqueue:is_dead_exclusive(Q))
1991+
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
19901992
end.
19911993

19921994
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ is_stateful() ->
6767
false.
6868

6969
-spec declare(amqqueue:amqqueue(), node()) ->
70-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
70+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
71+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
7172
declare(Q0, _Node) ->
7273
%% The queue gets persisted such that routing to this
7374
%% queue (via the topic exchange) works as usual.
@@ -84,20 +85,6 @@ declare(Q0, _Node) ->
8485
{arguments, amqqueue:get_arguments(Q0)},
8586
{user_who_performed_action, ActingUser}]),
8687
{new, Q};
87-
{absent, OldQ, nodedown} ->
88-
%% This case body can be deleted once Mnesia is unsupported.
89-
OldPid = amqqueue:get_pid(OldQ),
90-
OldNode = node(OldPid),
91-
rabbit_log_queue:debug(
92-
"Overwriting record of ~s of type ~s on node ~s since "
93-
"formerly hosting node ~s seems to be down (former pid ~p)",
94-
[rabbit_misc:rs(amqqueue:get_name(Q0)), ?MODULE, node(), OldNode, OldPid]),
95-
case rabbit_amqqueue:internal_declare(Q0, true) of
96-
{created, Q} ->
97-
{new, Q};
98-
Other ->
99-
Other
100-
end;
10188
Other ->
10289
Other
10390
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,38 +1026,45 @@ rabbit_mqtt_qos0_queue(Config) ->
10261026
ok = emqtt:disconnect(Pub).
10271027

10281028
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1029-
Topic = atom_to_binary(?FUNCTION_NAME),
1029+
Topic1 = <<"t/1">>,
1030+
Topic2 = <<"t/2">>,
10301031
Pub = connect(<<"publisher">>, Config, 2, []),
10311032

10321033
SubscriberId = <<"subscriber">>,
10331034
Sub0 = connect(SubscriberId, Config, 0, []),
1034-
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1035-
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1036-
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1035+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
1036+
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
1037+
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
10371038

10381039
process_flag(trap_exit, true),
10391040
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1040-
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1041-
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1041+
ok = await_exit(Sub0),
1042+
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
10421043
timer:sleep(500),
1044+
%% Re-connect to a live node with same MQTT client ID.
10431045
Sub1 = connect(SubscriberId, Config, 1, []),
1044-
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1045-
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1046-
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1046+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
1047+
ok = emqtt:publish(Pub, Topic2, <<"m1">>, qos0),
1048+
ok = expect_publishes(Sub1, Topic2, [<<"m1">>]),
1049+
%% Since we started a new clean session, previous subscription should have been deleted.
1050+
ok = emqtt:publish(Pub, Topic1, <<"m2">>, qos0),
1051+
receive {publish, _} = Publish -> ct:fail({unexpected, Publish})
1052+
after 300 -> ok
1053+
end,
10471054

1048-
%% Start node 0 to have a majority for Khepri.
10491055
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
10501056
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1051-
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1052-
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1057+
%% This time, do not wait.
1058+
%% rabbit_amqqueue:on_node_down/1 may or may not have run.
10531059
Sub2 = connect(SubscriberId, Config, 2, []),
1054-
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1055-
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1056-
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1060+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic2, qos0),
1061+
ok = emqtt:publish(Pub, Topic2, <<"m3">>, qos0),
1062+
ok = expect_publishes(Sub2, Topic2, [<<"m3">>]),
10571063

10581064
ok = emqtt:disconnect(Sub2),
10591065
ok = emqtt:disconnect(Pub),
1060-
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1066+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
1067+
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
10611068

10621069
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
10631070
management_plugin_connection(Config) ->

0 commit comments

Comments
 (0)