Skip to content

Commit 798dcf9

Browse files
ansdmergify[bot]
authored andcommitted
Overwrite rabbit_mqtt_qos0_queue record from crashed node
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is removed from Mnesia. When a node crashes and subsequently reboots the new node incarnation removes the old rabbit_mqtt_qos0_queue record from Mnesia (via rabbit_mqtt_qos0_queue:recover/2) However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed from Mnesia table rabbit_queue, but will still be present in table rabbit_durable_queue on the other live nodes. Prior to this commit, when the same MQTT client (i.e. same MQTT client ID) re-connects from the crashed node to another live node and re-subscribes, the following error occurred: ``` [info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232 [debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic, [debug] <0.43155.0> <<"as923/gateway/+/command/#">>,0}] [error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent, [error] <0.43155.0> {amqqueue, [error] <0.43155.0> {resource, [error] <0.43155.0> <<"/">>, [error] <0.43155.0> queue, [error] <0.43155.0> <<"mqtt-subscription-nodered_24e214feb018a232qos0">>}, [error] <0.43155.0> true, [error] <0.43155.0> false, [error] <0.43155.0> <15486.32690.0>, [error] <0.43155.0> [], [error] <0.43155.0> <15486.32690.0>, [error] <0.43155.0> [], [error] <0.43155.0> [], [error] <0.43155.0> [], [error] <0.43155.0> [{vhost, [error] <0.43155.0> <<"/">>}, [error] <0.43155.0> {name, [error] <0.43155.0> <<"ha-all-mqtt">>}, [error] <0.43155.0> {pattern, [error] <0.43155.0> <<"^mqtt-">>}, [error] <0.43155.0> {'apply-to', [error] <0.43155.0> <<"all">>}, [error] <0.43155.0> {definition, [error] <0.43155.0> [{<<"ha-mode">>, [error] <0.43155.0> <<"all">>}]}, [error] <0.43155.0> {priority, [error] <0.43155.0> 0}], [error] <0.43155.0> undefined, [error] <0.43155.0> [], [error] <0.43155.0> undefined, [error] <0.43155.0> live, [error] <0.43155.0> 0, [error] <0.43155.0> [], [error] <0.43155.0> <<"/">>, [error] <0.43155.0> #{user => [error] <0.43155.0> <<"iottester">>}, [error] <0.43155.0> rabbit_mqtt_qos0_queue, [error] <0.43155.0> #{}}, [error] <0.43155.0> nodedown} [error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error ``` This commit fixes this error allowing an MQTT client that connects with CleanSession=true and subscribes with QoS 0 to re-connect and re-subscribe to another live node if the original Rabbit node crashes. Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ (cherry picked from commit 9487189) # Conflicts: # deps/rabbitmq_mqtt/test/shared_SUITE.erl
1 parent e953289 commit 798dcf9

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ is_stateful() ->
6767
false.
6868

6969
-spec declare(amqqueue:amqqueue(), node()) ->
70-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
71-
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
70+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
7271
declare(Q0, _Node) ->
7372
%% The queue gets persisted such that routing to this
7473
%% queue (via the topic exchange) works as usual.
@@ -85,6 +84,20 @@ declare(Q0, _Node) ->
8584
{arguments, amqqueue:get_arguments(Q0)},
8685
{user_who_performed_action, ActingUser}]),
8786
{new, Q};
87+
{absent, OldQ, nodedown} ->
88+
%% This case body can be deleted once Mnesia is unsupported.
89+
OldPid = amqqueue:get_pid(OldQ),
90+
OldNode = node(OldPid),
91+
rabbit_log_queue:debug(
92+
"Overwriting record of ~s of type ~s on node ~s since "
93+
"formerly hosting node ~s seems to be down (former pid ~p)",
94+
[rabbit_misc:rs(amqqueue:get_name(Q1)), ?MODULE, node(), OldNode, OldPid]),
95+
case rabbit_amqqueue:internal_declare(Q1, true) of
96+
{created, Q} ->
97+
{new, Q};
98+
Other ->
99+
Other
100+
end;
88101
Other ->
89102
Other
90103
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,80 @@ subgroups() ->
118118
]}
119119
].
120120

121+
<<<<<<< HEAD
122+
=======
123+
cluster_size_1_tests() ->
124+
[
125+
global_counters %% must be the 1st test case
126+
,block_only_publisher
127+
,many_qos1_messages
128+
,session_expiry
129+
,management_plugin_connection
130+
,management_plugin_enable
131+
,disconnect
132+
,pubsub_shared_connection
133+
,pubsub_separate_connections
134+
,will_with_disconnect
135+
,will_without_disconnect
136+
,decode_basic_properties
137+
,quorum_queue_rejects
138+
,events
139+
,internal_event_handler
140+
,non_clean_sess_reconnect_qos1
141+
,non_clean_sess_reconnect_qos0
142+
,non_clean_sess_reconnect_qos0_and_qos1
143+
,non_clean_sess_empty_client_id
144+
,subscribe_same_topic_same_qos
145+
,subscribe_same_topic_different_qos
146+
,subscribe_multiple
147+
,large_message_mqtt_to_mqtt
148+
,large_message_amqp_to_mqtt
149+
,keepalive
150+
,keepalive_turned_off
151+
,block
152+
,amqp_to_mqtt_qos0
153+
,clean_session_disconnect_client
154+
,clean_session_node_restart
155+
,clean_session_node_kill
156+
,rabbit_status_connection_count
157+
,trace
158+
,trace_large_message
159+
,max_packet_size_unauthenticated
160+
,max_packet_size_authenticated
161+
,default_queue_type
162+
,incoming_message_interceptors
163+
,utf8
164+
,retained_message_conversion
165+
,bind_exchange_to_exchange
166+
,bind_exchange_to_exchange_single_message
167+
].
168+
169+
cluster_size_3_tests() ->
170+
[
171+
pubsub,
172+
queue_down_qos1,
173+
consuming_classic_queue_down,
174+
flow_quorum_queue,
175+
flow_stream,
176+
rabbit_mqtt_qos0_queue,
177+
rabbit_mqtt_qos0_queue_kill_node,
178+
cli_list_queues,
179+
delete_create_queue,
180+
session_reconnect,
181+
session_takeover,
182+
duplicate_client_id,
183+
maintenance
184+
].
185+
186+
mnesia_store_tests() ->
187+
[
188+
consuming_classic_mirrored_queue_down,
189+
flow_classic_mirrored_queue,
190+
publish_to_all_queue_types_qos0,
191+
publish_to_all_queue_types_qos1
192+
].
193+
194+
>>>>>>> 9487189dc6 (Overwrite rabbit_mqtt_qos0_queue record from crashed node)
121195
suite() ->
122196
[{timetrap, {minutes, 10}}].
123197

@@ -1019,6 +1093,40 @@ rabbit_mqtt_qos0_queue(Config) ->
10191093
ok = emqtt:disconnect(Sub),
10201094
ok = emqtt:disconnect(Pub).
10211095

1096+
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1097+
Topic = atom_to_binary(?FUNCTION_NAME),
1098+
Pub = connect(<<"publisher">>, Config, 2, []),
1099+
1100+
SubscriberId = <<"subscriber">>,
1101+
Sub0 = connect(SubscriberId, Config, 0, []),
1102+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1103+
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1104+
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1105+
1106+
process_flag(trap_exit, true),
1107+
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1108+
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1109+
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1110+
timer:sleep(500),
1111+
Sub1 = connect(SubscriberId, Config, 1, []),
1112+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1113+
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1114+
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1115+
1116+
%% Start node 0 to have a majority for Khepri.
1117+
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
1118+
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1119+
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1120+
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1121+
Sub2 = connect(SubscriberId, Config, 2, []),
1122+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1123+
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1124+
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1125+
1126+
ok = emqtt:disconnect(Sub2),
1127+
ok = emqtt:disconnect(Pub),
1128+
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1129+
10221130
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
10231131
management_plugin_connection(Config) ->
10241132
KeepaliveSecs = 99,

0 commit comments

Comments
 (0)