Skip to content

Commit bf16ce0

Browse files
authored
Merge pull request #11869 from rabbitmq/mergify/bp/v3.13.x/pr-11868
Disconnect MQTT client when its credential expires (backport #11867) (backport #11868)
2 parents ce37242 + 52924d1 commit bf16ce0

File tree

6 files changed

+125
-16
lines changed

6 files changed

+125
-16
lines changed

deps/rabbitmq_auth_backend_oauth2/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ eunit(
9393

9494
broker_for_integration_suites(
9595
extra_plugins = [
96-
"//deps/rabbitmq_mqtt:erlang_app",
96+
"//deps/rabbitmq_web_mqtt:erlang_app",
9797
],
9898
)
9999

deps/rabbitmq_auth_backend_oauth2/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export BUILD_WITHOUT_QUIC
88
LOCAL_DEPS = inets public_key
99
BUILD_DEPS = rabbit_common oauth2_client
1010
DEPS = rabbit cowlib jose base64url oauth2_client
11-
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_mqtt emqtt
11+
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_web_mqtt emqtt
1212

1313
PLT_APPS += rabbitmqctl
1414

deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ groups() ->
4343
test_failed_connection_with_a_non_token,
4444
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
4545
test_failed_connection_with_a_token_with_insufficient_resource_permission,
46-
more_than_one_resource_server_id_not_allowed_in_one_token
46+
more_than_one_resource_server_id_not_allowed_in_one_token,
47+
mqtt_expirable_token,
48+
web_mqtt_expirable_token,
49+
mqtt_expired_token
4750
]},
4851

4952
{token_refresh, [], [
@@ -87,16 +90,17 @@ groups() ->
8790

8891
init_per_suite(Config) ->
8992
rabbit_ct_helpers:log_environment(),
90-
rabbit_ct_helpers:run_setup_steps(Config,
91-
rabbit_ct_broker_helpers:setup_steps() ++ [
92-
fun preconfigure_node/1,
93-
fun preconfigure_token/1
94-
]).
93+
Config1 = rabbit_ct_helpers:run_setup_steps(
94+
Config,
95+
rabbit_ct_broker_helpers:setup_steps() ++
96+
[fun preconfigure_node/1,
97+
fun preconfigure_token/1]),
98+
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config1, mqtt_v5),
99+
Config1.
95100

96101
end_per_suite(Config) ->
97102
rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()).
98103

99-
100104
init_per_group(_Group, Config) ->
101105
%% The broker is managed by {init,end}_per_testcase().
102106
lists:foreach(fun(Value) ->
@@ -422,15 +426,80 @@ mqtt(Config) ->
422426
{ok, Pub} = emqtt:start_link([{clientid, <<"mqtt-publisher">>} | Opts]),
423427
{ok, _} = emqtt:connect(Pub),
424428
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
425-
receive
426-
{publish, #{client_pid := Sub,
427-
topic := Topic,
428-
payload := Payload}} -> ok
429+
receive {publish, #{client_pid := Sub,
430+
topic := Topic,
431+
payload := Payload}} -> ok
429432
after 1000 -> ct:fail("no publish received")
430433
end,
431434
ok = emqtt:disconnect(Sub),
432435
ok = emqtt:disconnect(Pub).
433436

437+
mqtt_expirable_token(Config) ->
438+
mqtt_expirable_token0(tcp_port_mqtt,
439+
[],
440+
fun emqtt:connect/1,
441+
Config).
442+
443+
web_mqtt_expirable_token(Config) ->
444+
mqtt_expirable_token0(tcp_port_web_mqtt,
445+
[{ws_path, "/ws"}],
446+
fun emqtt:ws_connect/1,
447+
Config).
448+
449+
mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) ->
450+
Topic = <<"test/topic">>,
451+
Payload = <<"mqtt-test-message">>,
452+
453+
Seconds = 4,
454+
Millis = Seconds * 1000,
455+
{_Algo, Token} = generate_expirable_token(Config,
456+
[<<"rabbitmq.configure:*/*/*">>,
457+
<<"rabbitmq.write:*/*/*">>,
458+
<<"rabbitmq.read:*/*/*">>],
459+
Seconds),
460+
461+
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, Port)},
462+
{proto_ver, v5},
463+
{username, <<"">>},
464+
{password, Token}] ++ AdditionalOpts,
465+
{ok, Sub} = emqtt:start_link([{clientid, <<"my subscriber">>} | Opts]),
466+
{ok, _} = Connect(Sub),
467+
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, at_least_once),
468+
{ok, Pub} = emqtt:start_link([{clientid, <<"my publisher">>} | Opts]),
469+
{ok, _} = Connect(Pub),
470+
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
471+
receive {publish, #{client_pid := Sub,
472+
topic := Topic,
473+
payload := Payload}} -> ok
474+
after 1000 -> ct:fail("no publish received")
475+
end,
476+
477+
%% reason code "Maximum connect time" defined in
478+
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208
479+
ReasonCode = 16#A0,
480+
true = unlink(Sub),
481+
true = unlink(Pub),
482+
483+
%% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired.
484+
receive {disconnected, ReasonCode, _} -> ok
485+
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
486+
end,
487+
receive {disconnected, ReasonCode, _} -> ok
488+
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
489+
end.
490+
491+
mqtt_expired_token(Config) ->
492+
{_Algo, Token} = generate_expired_token(Config),
493+
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
494+
{proto_ver, v5},
495+
{username, <<"">>},
496+
{password, Token}],
497+
ClientId = atom_to_binary(?FUNCTION_NAME),
498+
{ok, C} = emqtt:start_link([{clientid, ClientId} | Opts]),
499+
true = unlink(C),
500+
?assertMatch({error, {bad_username_or_password, _}},
501+
emqtt:connect(C)).
502+
434503
test_successful_connection_with_complex_claim_as_a_map(Config) ->
435504
{_Algo, Token} = generate_valid_token_with_extra_fields(
436505
Config,

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ process_connect(
196196
ok ?= check_user_connection_limit(Username),
197197
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp),
198198
ok ?= check_user_loopback(Username, PeerIp),
199+
ok ?= ensure_credential_expiry_timer(User, PeerIp),
199200
rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt),
200201
{ok, RaRegisterState} ?= register_client_id(VHost, ClientId, CleanStart, WillProps),
201202
{ok, WillMsg} ?= make_will_msg(Packet),
@@ -1126,6 +1127,27 @@ check_user_loopback(Username, PeerIp) ->
11261127
{error, ?RC_NOT_AUTHORIZED}
11271128
end.
11281129

1130+
1131+
ensure_credential_expiry_timer(User = #user{username = Username}, PeerIp) ->
1132+
case rabbit_access_control:expiry_timestamp(User) of
1133+
never ->
1134+
ok;
1135+
Ts when is_integer(Ts) ->
1136+
Time = (Ts - os:system_time(second)) * 1000,
1137+
?LOG_DEBUG("Credential expires in ~b ms frow now "
1138+
"(absolute timestamp = ~b seconds since epoch)",
1139+
[Time, Ts]),
1140+
case Time > 0 of
1141+
true ->
1142+
_TimerRef = erlang:send_after(Time, self(), credential_expired),
1143+
ok;
1144+
false ->
1145+
auth_attempt_failed(PeerIp, Username),
1146+
?LOG_WARNING("Credential expired ~b ms ago", [abs(Time)]),
1147+
{error, ?RC_NOT_AUTHORIZED}
1148+
end
1149+
end.
1150+
11291151
get_vhost(UserBin, none, Port) ->
11301152
get_vhost_no_ssl(UserBin, Port);
11311153
get_vhost(UserBin, SslLogin, Port) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ handle_cast(decommission_node,
129129
{stop, {shutdown, decommission_node}, State};
130130

131131
handle_cast({close_connection, Reason},
132-
State = #state{conn_name = ConnName, proc_state = PState}) ->
132+
State = #state{conn_name = ConnName,
133+
proc_state = PState}) ->
133134
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts",
134135
[ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
135136
case Reason of
@@ -219,6 +220,14 @@ handle_info({keepalive, Req}, State = #state{proc_state = PState,
219220
{stop, Reason, State}
220221
end;
221222

223+
handle_info(credential_expired,
224+
State = #state{conn_name = ConnName,
225+
proc_state = PState}) ->
226+
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts' because credential expired",
227+
[ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
228+
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, PState),
229+
{stop, {shutdown, {disconnect, server_initiated}}, State};
230+
222231
handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed,
223232
conn_name = ConnName}) ->
224233
%% The connection is also closed if the CONNECT packet happens to

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,9 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}},
193193
rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState),
194194
defer_close(?CLOSE_NORMAL, SendWill),
195195
{[], State};
196-
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState,
197-
conn_name = ConnName}) ->
196+
websocket_info({'$gen_cast', {close_connection, Reason}},
197+
State = #state{proc_state = ProcState,
198+
conn_name = ConnName}) ->
198199
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
199200
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
200201
case Reason of
@@ -232,6 +233,14 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState,
232233
[ConnName, Reason]),
233234
stop(State)
234235
end;
236+
websocket_info(credential_expired,
237+
State = #state{proc_state = ProcState,
238+
conn_name = ConnName}) ->
239+
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p) because credential expired",
240+
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
241+
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, ProcState),
242+
defer_close(?CLOSE_NORMAL),
243+
{[], State};
235244
websocket_info(emit_stats, State) ->
236245
{[], emit_stats(State), hibernate};
237246
websocket_info({ra_event, _From, Evt},

0 commit comments

Comments
 (0)