Skip to content

Commit afa28cb

Browse files
authored
Merge pull request #12118 from rabbitmq/issue-11985
MQTT and Streams: handle connection shutdown via CLI command gracefully
2 parents c8fa044 + 8c905b9 commit afa28cb

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,6 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
428428
Node = node(Pid),
429429
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]);
430430
close_connection(#tracked_connection{pid = Pid}, Message) ->
431-
% best effort, this will work for connections to the stream plugin
432-
Node = node(Pid),
433-
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).
431+
%% Best effort will work for following plugins:
432+
%% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt
433+
Pid ! {shutdown, Message}.

deps/rabbit/src/rabbit_networking.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,8 @@ close_connections(Pids, Explanation) ->
531531

532532
-spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'.
533533
close_all_user_connections(Username, Explanation) ->
534-
Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)],
535-
[close_connection(Pid, Explanation) || Pid <- Pids],
536-
ok.
534+
Tracked = rabbit_connection_tracking:list_of_user(Username),
535+
rabbit_connection_tracking:close_connections(Tracked, Explanation, 0).
537536

538537
%% Meant to be used by tests only
539538
-spec close_all_connections(string()) -> 'ok'.

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
247247
{noreply, State, ?HIBERNATE_AFTER};
248248

249249
handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) ->
250-
%% rabbitmq_management plugin requests to close connection.
250+
%% rabbitmq_management plugin or CLI command requests to close connection.
251251
?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]),
252252
{stop, Reason, State};
253253

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ cluster_size_1_tests() ->
9191
,block_only_publisher
9292
,many_qos1_messages
9393
,session_expiry
94+
,cli_close_all_connections
95+
,cli_close_all_user_connections
9496
,management_plugin_connection
9597
,management_plugin_enable
9698
,disconnect
@@ -1165,6 +1167,24 @@ rabbit_mqtt_qos0_queue_kill_node(Config) ->
11651167
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
11661168
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
11671169

1170+
cli_close_all_connections(Config) ->
1171+
ClientId = atom_to_binary(?FUNCTION_NAME),
1172+
C = connect(ClientId, Config),
1173+
process_flag(trap_exit, true),
1174+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
1175+
Config, 0, ["close_all_connections", "bye"]),
1176+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1177+
ok = await_exit(C).
1178+
1179+
cli_close_all_user_connections(Config) ->
1180+
ClientId = atom_to_binary(?FUNCTION_NAME),
1181+
C = connect(ClientId, Config),
1182+
process_flag(trap_exit, true),
1183+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
1184+
Config, 0, ["close_all_user_connections","guest", "bye"]),
1185+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1186+
ok = await_exit(C).
1187+
11681188
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
11691189
management_plugin_connection(Config) ->
11701190
KeepaliveSecs = 99,

0 commit comments

Comments
 (0)