Skip to content

Commit 89e04e9

Browse files
Merge pull request #10203 from rabbitmq/mqtt-qos0-queue-node-down
Overwrite rabbit_mqtt_qos0_queue record from crashed node
2 parents 0816c2f + 9487189 commit 89e04e9

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ is_stateful() ->
6868
false.
6969

7070
-spec declare(amqqueue:amqqueue(), node()) ->
71-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
72-
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
71+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
7372
declare(Q0, _Node) ->
7473
Q1 = case amqqueue:get_pid(Q0) of
7574
none ->
@@ -93,6 +92,20 @@ declare(Q0, _Node) ->
9392
{arguments, amqqueue:get_arguments(Q)},
9493
{user_who_performed_action, ActingUser}]),
9594
{new, Q};
95+
{absent, OldQ, nodedown} ->
96+
%% This case body can be deleted once Mnesia is unsupported.
97+
OldPid = amqqueue:get_pid(OldQ),
98+
OldNode = node(OldPid),
99+
rabbit_log_queue:debug(
100+
"Overwriting record of ~s of type ~s on node ~s since "
101+
"formerly hosting node ~s seems to be down (former pid ~p)",
102+
[rabbit_misc:rs(amqqueue:get_name(Q1)), ?MODULE, node(), OldNode, OldPid]),
103+
case rabbit_amqqueue:internal_declare(Q1, true) of
104+
{created, Q} ->
105+
{new, Q};
106+
Other ->
107+
Other
108+
end;
96109
Other ->
97110
Other
98111
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ cluster_size_3_tests() ->
142142
flow_quorum_queue,
143143
flow_stream,
144144
rabbit_mqtt_qos0_queue,
145+
rabbit_mqtt_qos0_queue_kill_node,
145146
cli_list_queues,
146147
delete_create_queue,
147148
session_reconnect,
@@ -1212,6 +1213,40 @@ rabbit_mqtt_qos0_queue(Config) ->
12121213
ok = emqtt:disconnect(Sub),
12131214
ok = emqtt:disconnect(Pub).
12141215

1216+
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1217+
Topic = atom_to_binary(?FUNCTION_NAME),
1218+
Pub = connect(<<"publisher">>, Config, 2, []),
1219+
1220+
SubscriberId = <<"subscriber">>,
1221+
Sub0 = connect(SubscriberId, Config, 0, []),
1222+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1223+
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1224+
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1225+
1226+
process_flag(trap_exit, true),
1227+
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1228+
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1229+
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1230+
timer:sleep(500),
1231+
Sub1 = connect(SubscriberId, Config, 1, []),
1232+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1233+
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1234+
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1235+
1236+
%% Start node 0 to have a majority for Khepri.
1237+
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
1238+
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1239+
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1240+
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1241+
Sub2 = connect(SubscriberId, Config, 2, []),
1242+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1243+
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1244+
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1245+
1246+
ok = emqtt:disconnect(Sub2),
1247+
ok = emqtt:disconnect(Pub),
1248+
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1249+
12151250
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
12161251
management_plugin_connection(Config) ->
12171252
KeepaliveSecs = 99,

0 commit comments

Comments
 (0)