Skip to content

Commit 7488332

Browse files
ansdmergify[bot]
authored andcommitted
Disconnect MQTT client when its credential expires
Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11854 Fixes #11862 This commit uses the same approach as implemented for AMQP 1.0 and Streams: When a token expires, RabbitMQ will close the connection. (cherry picked from commit 7fb7833)
1 parent 2e67915 commit 7488332

File tree

7 files changed

+119
-11
lines changed

7 files changed

+119
-11
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ ensure_credential_expiry_timer(User) ->
910910
ok;
911911
false ->
912912
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
913-
"Credential expired ~b ms ago", [Time])
913+
"Credential expired ~b ms ago", [abs(Time)])
914914
end
915915
end.
916916

deps/rabbitmq_auth_backend_oauth2/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ eunit(
9393

9494
broker_for_integration_suites(
9595
extra_plugins = [
96-
"//deps/rabbitmq_mqtt:erlang_app",
96+
"//deps/rabbitmq_web_mqtt:erlang_app",
9797
],
9898
)
9999

deps/rabbitmq_auth_backend_oauth2/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export BUILD_WITHOUT_QUIC
88
LOCAL_DEPS = inets public_key
99
BUILD_DEPS = rabbit_common
1010
DEPS = rabbit cowlib jose base64url oauth2_client
11-
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_mqtt emqtt
11+
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_web_mqtt emqtt
1212

1313
PLT_APPS += rabbitmqctl
1414

deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ groups() ->
4343
test_failed_connection_with_a_non_token,
4444
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
4545
test_failed_connection_with_a_token_with_insufficient_resource_permission,
46-
more_than_one_resource_server_id_not_allowed_in_one_token
46+
more_than_one_resource_server_id_not_allowed_in_one_token,
47+
mqtt_expirable_token,
48+
web_mqtt_expirable_token,
49+
mqtt_expired_token
4750
]},
4851

4952
{token_refresh, [], [
@@ -422,15 +425,80 @@ mqtt(Config) ->
422425
{ok, Pub} = emqtt:start_link([{clientid, <<"mqtt-publisher">>} | Opts]),
423426
{ok, _} = emqtt:connect(Pub),
424427
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
425-
receive
426-
{publish, #{client_pid := Sub,
427-
topic := Topic,
428-
payload := Payload}} -> ok
428+
receive {publish, #{client_pid := Sub,
429+
topic := Topic,
430+
payload := Payload}} -> ok
429431
after 1000 -> ct:fail("no publish received")
430432
end,
431433
ok = emqtt:disconnect(Sub),
432434
ok = emqtt:disconnect(Pub).
433435

436+
mqtt_expirable_token(Config) ->
437+
mqtt_expirable_token0(tcp_port_mqtt,
438+
[],
439+
fun emqtt:connect/1,
440+
Config).
441+
442+
web_mqtt_expirable_token(Config) ->
443+
mqtt_expirable_token0(tcp_port_web_mqtt,
444+
[{ws_path, "/ws"}],
445+
fun emqtt:ws_connect/1,
446+
Config).
447+
448+
mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) ->
449+
Topic = <<"test/topic">>,
450+
Payload = <<"mqtt-test-message">>,
451+
452+
Seconds = 4,
453+
Millis = Seconds * 1000,
454+
{_Algo, Token} = generate_expirable_token(Config,
455+
[<<"rabbitmq.configure:*/*/*">>,
456+
<<"rabbitmq.write:*/*/*">>,
457+
<<"rabbitmq.read:*/*/*">>],
458+
Seconds),
459+
460+
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, Port)},
461+
{proto_ver, v5},
462+
{username, <<"">>},
463+
{password, Token}] ++ AdditionalOpts,
464+
{ok, Sub} = emqtt:start_link([{clientid, <<"my subscriber">>} | Opts]),
465+
{ok, _} = Connect(Sub),
466+
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, at_least_once),
467+
{ok, Pub} = emqtt:start_link([{clientid, <<"my publisher">>} | Opts]),
468+
{ok, _} = Connect(Pub),
469+
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
470+
receive {publish, #{client_pid := Sub,
471+
topic := Topic,
472+
payload := Payload}} -> ok
473+
after 1000 -> ct:fail("no publish received")
474+
end,
475+
476+
%% reason code "Maximum connect time" defined in
477+
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208
478+
ReasonCode = 16#A0,
479+
true = unlink(Sub),
480+
true = unlink(Pub),
481+
482+
%% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired.
483+
receive {disconnected, ReasonCode, _} -> ok
484+
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
485+
end,
486+
receive {disconnected, ReasonCode, _} -> ok
487+
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
488+
end.
489+
490+
mqtt_expired_token(Config) ->
491+
{_Algo, Token} = generate_expired_token(Config),
492+
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
493+
{proto_ver, v5},
494+
{username, <<"">>},
495+
{password, Token}],
496+
ClientId = atom_to_binary(?FUNCTION_NAME),
497+
{ok, C} = emqtt:start_link([{clientid, ClientId} | Opts]),
498+
true = unlink(C),
499+
?assertMatch({error, {bad_username_or_password, _}},
500+
emqtt:connect(C)).
501+
434502
test_successful_connection_with_complex_claim_as_a_map(Config) ->
435503
{_Algo, Token} = generate_valid_token_with_extra_fields(
436504
Config,

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ process_connect(
189189
ok ?= check_user_connection_limit(Username),
190190
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp),
191191
ok ?= check_user_loopback(Username, PeerIp),
192+
ok ?= ensure_credential_expiry_timer(User, PeerIp),
192193
rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt),
193194
ok = register_client_id(VHost, ClientId, CleanStart, WillProps),
194195
{ok, WillMsg} ?= make_will_msg(Packet),
@@ -1086,6 +1087,27 @@ check_user_loopback(Username, PeerIp) ->
10861087
{error, ?RC_NOT_AUTHORIZED}
10871088
end.
10881089

1090+
1091+
ensure_credential_expiry_timer(User = #user{username = Username}, PeerIp) ->
1092+
case rabbit_access_control:expiry_timestamp(User) of
1093+
never ->
1094+
ok;
1095+
Ts when is_integer(Ts) ->
1096+
Time = (Ts - os:system_time(second)) * 1000,
1097+
?LOG_DEBUG("Credential expires in ~b ms frow now "
1098+
"(absolute timestamp = ~b seconds since epoch)",
1099+
[Time, Ts]),
1100+
case Time > 0 of
1101+
true ->
1102+
_TimerRef = erlang:send_after(Time, self(), credential_expired),
1103+
ok;
1104+
false ->
1105+
auth_attempt_failed(PeerIp, Username),
1106+
?LOG_WARNING("Credential expired ~b ms ago", [abs(Time)]),
1107+
{error, ?RC_NOT_AUTHORIZED}
1108+
end
1109+
end.
1110+
10891111
get_vhost(UserBin, none, Port) ->
10901112
get_vhost_no_ssl(UserBin, Port);
10911113
get_vhost(UserBin, SslLogin, Port) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ handle_cast({duplicate_id, SendWill},
121121
{stop, {shutdown, duplicate_id}, {SendWill, State}};
122122

123123
handle_cast({close_connection, Reason},
124-
State = #state{conn_name = ConnName, proc_state = PState}) ->
124+
State = #state{conn_name = ConnName,
125+
proc_state = PState}) ->
125126
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts",
126127
[ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
127128
case Reason of
@@ -211,6 +212,14 @@ handle_info({keepalive, Req}, State = #state{proc_state = PState,
211212
{stop, Reason, State}
212213
end;
213214

215+
handle_info(credential_expired,
216+
State = #state{conn_name = ConnName,
217+
proc_state = PState}) ->
218+
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts' because credential expired",
219+
[ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
220+
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, PState),
221+
{stop, {shutdown, {disconnect, server_initiated}}, State};
222+
214223
handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed,
215224
conn_name = ConnName}) ->
216225
%% The connection is also closed if the CONNECT packet happens to

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}},
176176
rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState),
177177
defer_close(?CLOSE_NORMAL, SendWill),
178178
{[], State};
179-
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState,
180-
conn_name = ConnName}) ->
179+
websocket_info({'$gen_cast', {close_connection, Reason}},
180+
State = #state{proc_state = ProcState,
181+
conn_name = ConnName}) ->
181182
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
182183
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
183184
case Reason of
@@ -215,6 +216,14 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState,
215216
[ConnName, Reason]),
216217
stop(State)
217218
end;
219+
websocket_info(credential_expired,
220+
State = #state{proc_state = ProcState,
221+
conn_name = ConnName}) ->
222+
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p) because credential expired",
223+
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
224+
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, ProcState),
225+
defer_close(?CLOSE_NORMAL),
226+
{[], State};
218227
websocket_info(emit_stats, State) ->
219228
{[], emit_stats(State), hibernate};
220229
websocket_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,

0 commit comments

Comments
 (0)