Skip to content

Commit 824b2d8

Browse files
Merge pull request #10244 from rabbitmq/qos0-queue
Allow MQTT QoS 0 subscribers to reconnect
2 parents 690ca1d + 78b4fcc commit 824b2d8

File tree

3 files changed

+28
-32
lines changed

3 files changed

+28
-32
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1995,7 +1995,8 @@ on_node_down(Node) ->
19951995
{QueueNames, Deletions} ->
19961996
case length(QueueNames) of
19971997
0 -> ok;
1998-
_ -> rabbit_log:info("~tp transient queues from an old incarnation of node ~tp deleted in ~fs", [length(QueueNames), Node, Time/1000000])
1998+
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
1999+
[N, Node, Time / 1_000_000])
19992000
end,
20002001
notify_queue_binding_deletions(Deletions),
20012002
rabbit_core_metrics:queues_deleted(QueueNames),
@@ -2010,6 +2011,7 @@ filter_transient_queues_to_delete(Node) ->
20102011
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
20112012
andalso (not rabbit_amqqueue:is_replicated(Q)
20122013
orelse rabbit_amqqueue:is_dead_exclusive(Q))
2014+
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
20132015
end.
20142016

20152017
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ is_stateful() ->
6868
false.
6969

7070
-spec declare(amqqueue:amqqueue(), node()) ->
71-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
71+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
72+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
7273
declare(Q0, _Node) ->
7374
Q1 = case amqqueue:get_pid(Q0) of
7475
none ->
@@ -92,20 +93,6 @@ declare(Q0, _Node) ->
9293
{arguments, amqqueue:get_arguments(Q)},
9394
{user_who_performed_action, ActingUser}]),
9495
{new, Q};
95-
{absent, OldQ, nodedown} ->
96-
%% This case body can be deleted once Mnesia is unsupported.
97-
OldPid = amqqueue:get_pid(OldQ),
98-
OldNode = node(OldPid),
99-
rabbit_log_queue:debug(
100-
"Overwriting record of ~s of type ~s on node ~s since "
101-
"formerly hosting node ~s seems to be down (former pid ~p)",
102-
[rabbit_misc:rs(amqqueue:get_name(Q1)), ?MODULE, node(), OldNode, OldPid]),
103-
case rabbit_amqqueue:internal_declare(Q1, true) of
104-
{created, Q} ->
105-
{new, Q};
106-
Other ->
107-
Other
108-
end;
10996
Other ->
11097
Other
11198
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,38 +1214,45 @@ rabbit_mqtt_qos0_queue(Config) ->
12141214
ok = emqtt:disconnect(Pub).
12151215

12161216
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1217-
Topic = atom_to_binary(?FUNCTION_NAME),
1217+
Topic1 = <<"t/1">>,
1218+
Topic2 = <<"t/2">>,
12181219
Pub = connect(<<"publisher">>, Config, 2, []),
12191220

12201221
SubscriberId = <<"subscriber">>,
12211222
Sub0 = connect(SubscriberId, Config, 0, []),
1222-
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1223-
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1224-
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1223+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
1224+
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
1225+
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
12251226

12261227
process_flag(trap_exit, true),
12271228
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1228-
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1229-
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1229+
ok = await_exit(Sub0),
1230+
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
12301231
timer:sleep(500),
1232+
%% Re-connect to a live node with same MQTT client ID.
12311233
Sub1 = connect(SubscriberId, Config, 1, []),
1232-
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1233-
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1234-
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1234+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
1235+
ok = emqtt:publish(Pub, Topic2, <<"m1">>, qos0),
1236+
ok = expect_publishes(Sub1, Topic2, [<<"m1">>]),
1237+
%% Since we started a new clean session, previous subscription should have been deleted.
1238+
ok = emqtt:publish(Pub, Topic1, <<"m2">>, qos0),
1239+
receive {publish, _} = Publish -> ct:fail({unexpected, Publish})
1240+
after 300 -> ok
1241+
end,
12351242

1236-
%% Start node 0 to have a majority for Khepri.
12371243
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
12381244
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1239-
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1240-
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1245+
%% This time, do not wait.
1246+
%% rabbit_amqqueue:on_node_down/1 may or may not have run.
12411247
Sub2 = connect(SubscriberId, Config, 2, []),
1242-
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1243-
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1244-
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1248+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic2, qos0),
1249+
ok = emqtt:publish(Pub, Topic2, <<"m3">>, qos0),
1250+
ok = expect_publishes(Sub2, Topic2, [<<"m3">>]),
12451251

12461252
ok = emqtt:disconnect(Sub2),
12471253
ok = emqtt:disconnect(Pub),
1248-
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1254+
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
1255+
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
12491256

12501257
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
12511258
management_plugin_connection(Config) ->

0 commit comments

Comments
 (0)