Skip to content

Disconnect MQTT client when its credential expires (backport #11867) (backport #11868) #11869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbitmq_auth_backend_oauth2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ eunit(

broker_for_integration_suites(
extra_plugins = [
"//deps/rabbitmq_mqtt:erlang_app",
"//deps/rabbitmq_web_mqtt:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_auth_backend_oauth2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export BUILD_WITHOUT_QUIC
LOCAL_DEPS = inets public_key
BUILD_DEPS = rabbit_common oauth2_client
DEPS = rabbit cowlib jose base64url oauth2_client
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_mqtt emqtt
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_web_mqtt emqtt

PLT_APPS += rabbitmqctl

Expand Down
91 changes: 80 additions & 11 deletions deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ groups() ->
test_failed_connection_with_a_non_token,
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
test_failed_connection_with_a_token_with_insufficient_resource_permission,
more_than_one_resource_server_id_not_allowed_in_one_token
more_than_one_resource_server_id_not_allowed_in_one_token,
mqtt_expirable_token,
web_mqtt_expirable_token,
mqtt_expired_token
]},

{token_refresh, [], [
Expand Down Expand Up @@ -87,16 +90,17 @@ groups() ->

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config,
rabbit_ct_broker_helpers:setup_steps() ++ [
fun preconfigure_node/1,
fun preconfigure_token/1
]).
Config1 = rabbit_ct_helpers:run_setup_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
[fun preconfigure_node/1,
fun preconfigure_token/1]),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config1, mqtt_v5),
Config1.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()).


init_per_group(_Group, Config) ->
%% The broker is managed by {init,end}_per_testcase().
lists:foreach(fun(Value) ->
Expand Down Expand Up @@ -422,15 +426,80 @@ mqtt(Config) ->
{ok, Pub} = emqtt:start_link([{clientid, <<"mqtt-publisher">>} | Opts]),
{ok, _} = emqtt:connect(Pub),
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
receive
{publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
receive {publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
after 1000 -> ct:fail("no publish received")
end,
ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).

mqtt_expirable_token(Config) ->
mqtt_expirable_token0(tcp_port_mqtt,
[],
fun emqtt:connect/1,
Config).

web_mqtt_expirable_token(Config) ->
mqtt_expirable_token0(tcp_port_web_mqtt,
[{ws_path, "/ws"}],
fun emqtt:ws_connect/1,
Config).

mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) ->
Topic = <<"test/topic">>,
Payload = <<"mqtt-test-message">>,

Seconds = 4,
Millis = Seconds * 1000,
{_Algo, Token} = generate_expirable_token(Config,
[<<"rabbitmq.configure:*/*/*">>,
<<"rabbitmq.write:*/*/*">>,
<<"rabbitmq.read:*/*/*">>],
Seconds),

Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, Port)},
{proto_ver, v5},
{username, <<"">>},
{password, Token}] ++ AdditionalOpts,
{ok, Sub} = emqtt:start_link([{clientid, <<"my subscriber">>} | Opts]),
{ok, _} = Connect(Sub),
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, at_least_once),
{ok, Pub} = emqtt:start_link([{clientid, <<"my publisher">>} | Opts]),
{ok, _} = Connect(Pub),
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
receive {publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
after 1000 -> ct:fail("no publish received")
end,

%% reason code "Maximum connect time" defined in
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208
ReasonCode = 16#A0,
true = unlink(Sub),
true = unlink(Pub),

%% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired.
receive {disconnected, ReasonCode, _} -> ok
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
end,
receive {disconnected, ReasonCode, _} -> ok
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
end.

mqtt_expired_token(Config) ->
{_Algo, Token} = generate_expired_token(Config),
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
{proto_ver, v5},
{username, <<"">>},
{password, Token}],
ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{clientid, ClientId} | Opts]),
true = unlink(C),
?assertMatch({error, {bad_username_or_password, _}},
emqtt:connect(C)).

test_successful_connection_with_complex_claim_as_a_map(Config) ->
{_Algo, Token} = generate_valid_token_with_extra_fields(
Config,
Expand Down
22 changes: 22 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ process_connect(
ok ?= check_user_connection_limit(Username),
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp),
ok ?= check_user_loopback(Username, PeerIp),
ok ?= ensure_credential_expiry_timer(User, PeerIp),
rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt),
{ok, RaRegisterState} ?= register_client_id(VHost, ClientId, CleanStart, WillProps),
{ok, WillMsg} ?= make_will_msg(Packet),
Expand Down Expand Up @@ -1126,6 +1127,27 @@ check_user_loopback(Username, PeerIp) ->
{error, ?RC_NOT_AUTHORIZED}
end.


ensure_credential_expiry_timer(User = #user{username = Username}, PeerIp) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
?LOG_DEBUG("Credential expires in ~b ms frow now "
"(absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
false ->
auth_attempt_failed(PeerIp, Username),
?LOG_WARNING("Credential expired ~b ms ago", [abs(Time)]),
{error, ?RC_NOT_AUTHORIZED}
end
end.

get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, SslLogin, Port) ->
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ handle_cast(decommission_node,
{stop, {shutdown, decommission_node}, State};

handle_cast({close_connection, Reason},
State = #state{conn_name = ConnName, proc_state = PState}) ->
State = #state{conn_name = ConnName,
proc_state = PState}) ->
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts",
[ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
case Reason of
Expand Down Expand Up @@ -219,6 +220,14 @@ handle_info({keepalive, Req}, State = #state{proc_state = PState,
{stop, Reason, State}
end;

handle_info(credential_expired,
State = #state{conn_name = ConnName,
proc_state = PState}) ->
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts' because credential expired",
[ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, PState),
{stop, {shutdown, {disconnect, server_initiated}}, State};

handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed,
conn_name = ConnName}) ->
%% The connection is also closed if the CONNECT packet happens to
Expand Down
13 changes: 11 additions & 2 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}},
rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState),
defer_close(?CLOSE_NORMAL, SendWill),
{[], State};
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
websocket_info({'$gen_cast', {close_connection, Reason}},
State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
case Reason of
Expand Down Expand Up @@ -232,6 +233,14 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState,
[ConnName, Reason]),
stop(State)
end;
websocket_info(credential_expired,
State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p) because credential expired",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, ProcState),
defer_close(?CLOSE_NORMAL),
{[], State};
websocket_info(emit_stats, State) ->
{[], emit_stats(State), hibernate};
websocket_info({ra_event, _From, Evt},
Expand Down
Loading