Skip to content

Commit 55bd90c

Browse files
Merge pull request #9187 from rabbitmq/update-secret
Update secret on the stream protocol
2 parents 0be8111 + b715a4d commit 55bd90c

File tree

4 files changed

+209
-29
lines changed

4 files changed

+209
-29
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,8 +1461,7 @@ handle_frame_pre_auth(Transport,
14611461
[Username]),
14621462
{C1#stream_connection{connection_step =
14631463
failure},
1464-
{sasl_authenticate,
1465-
?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK,
1464+
{sasl_authenticate, ?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK,
14661465
<<>>}}
14671466
end
14681467
end,
@@ -1643,6 +1642,100 @@ handle_frame_post_auth(Transport,
16431642
rabbit_global_counters:increase_protocol_counter(stream,
16441643
?PRECONDITION_FAILED, 1),
16451644
{Connection0, State};
1645+
1646+
handle_frame_post_auth(Transport,
1647+
#stream_connection{user = #user{username = Username} = _User,
1648+
socket = S,
1649+
host = Host,
1650+
auth_mechanism = Auth_Mechanism,
1651+
authentication_state = AuthState,
1652+
resource_alarm = false} =
1653+
C1,
1654+
State,
1655+
{request, CorrelationId,
1656+
{sasl_authenticate, NewMechanism, NewSaslBin}}) ->
1657+
rabbit_log:debug("Open frame received sasl_authenticate for username '~ts'", [Username]),
1658+
1659+
Connection1 =
1660+
case Auth_Mechanism of
1661+
{NewMechanism, AuthMechanism} -> %% Mechanism is the same used during the pre-auth phase
1662+
{C2, CmdBody} =
1663+
case AuthMechanism:handle_response(NewSaslBin, AuthState) of
1664+
{refused, NewUsername, Msg, Args} ->
1665+
rabbit_core_metrics:auth_attempt_failed(Host,
1666+
NewUsername,
1667+
stream),
1668+
auth_fail(NewUsername, Msg, Args, C1, State),
1669+
rabbit_log_connection:warning(Msg, Args),
1670+
{C1#stream_connection{connection_step = failure},
1671+
{sasl_authenticate,
1672+
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
1673+
{protocol_error, Msg, Args} ->
1674+
rabbit_core_metrics:auth_attempt_failed(Host,
1675+
<<>>,
1676+
stream),
1677+
notify_auth_result(none,
1678+
user_authentication_failure,
1679+
[{error,
1680+
rabbit_misc:format(Msg,
1681+
Args)}],
1682+
C1,
1683+
State),
1684+
rabbit_log_connection:warning(Msg, Args),
1685+
{C1#stream_connection{connection_step = failure},
1686+
{sasl_authenticate, ?RESPONSE_SASL_ERROR, <<>>}};
1687+
{challenge, Challenge, AuthState1} ->
1688+
{C1#stream_connection{authentication_state = AuthState1,
1689+
connection_step = authenticating},
1690+
{sasl_authenticate, ?RESPONSE_SASL_CHALLENGE,
1691+
Challenge}};
1692+
{ok, NewUser = #user{username = NewUsername}} ->
1693+
case NewUsername of
1694+
Username ->
1695+
rabbit_core_metrics:auth_attempt_succeeded(Host,
1696+
Username,
1697+
stream),
1698+
notify_auth_result(Username,
1699+
user_authentication_success,
1700+
[],
1701+
C1,
1702+
State),
1703+
rabbit_log:debug("Successfully updated secret for username '~ts'", [Username]),
1704+
{C1#stream_connection{user = NewUser,
1705+
authentication_state = done,
1706+
connection_step = authenticated},
1707+
{sasl_authenticate, ?RESPONSE_CODE_OK,
1708+
<<>>}};
1709+
_ ->
1710+
rabbit_core_metrics:auth_attempt_failed(Host,
1711+
Username,
1712+
stream),
1713+
rabbit_log_connection:warning("Not allowed to change username '~ts'. Only password",
1714+
[Username]),
1715+
{C1#stream_connection{connection_step =
1716+
failure},
1717+
{sasl_authenticate,
1718+
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME,
1719+
<<>>}}
1720+
end
1721+
end,
1722+
Frame =
1723+
rabbit_stream_core:frame({response, CorrelationId,
1724+
CmdBody}),
1725+
send(Transport, S, Frame),
1726+
C2;
1727+
{OtherMechanism, _} ->
1728+
rabbit_log_connection:warning("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'",
1729+
[Username, NewMechanism, OtherMechanism]),
1730+
CmdBody =
1731+
{sasl_authenticate, ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, <<>>},
1732+
Frame = rabbit_stream_core:frame({response, CorrelationId, CmdBody}),
1733+
send(Transport, S, Frame),
1734+
C1#stream_connection{connection_step = failure}
1735+
end,
1736+
1737+
{Connection1, State};
1738+
16461739
handle_frame_post_auth(Transport,
16471740
#stream_connection{user = User,
16481741
publishers = Publishers0,
@@ -3866,4 +3959,3 @@ stream_from_consumers(SubId, Consumers) ->
38663959
_ ->
38673960
undefined
38683961
end.
3869-

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 109 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ groups() ->
3939
test_publish_v2,
4040
test_gc_consumers,
4141
test_gc_publishers,
42+
test_update_secret,
43+
cannot_update_username_after_authenticated,
44+
cannot_use_another_authmechanism_when_updating_secret,
4245
unauthenticated_client_rejected_tcp_connected,
4346
timeout_tcp_connected,
4447
unauthenticated_client_rejected_peer_properties_exchanged,
@@ -48,7 +51,8 @@ groups() ->
4851
timeout_close_sent,
4952
max_segment_size_bytes_validation,
5053
close_connection_on_consumer_update_timeout,
51-
set_filter_size]},
54+
set_filter_size
55+
]},
5256
%% Run `test_global_counters` on its own so the global metrics are
5357
%% initialised to 0 for each testcase
5458
{single_node_1, [], [test_global_counters]},
@@ -132,6 +136,13 @@ end_per_group(_, Config) ->
132136
rabbit_ct_helpers:run_steps(Config,
133137
rabbit_ct_broker_helpers:teardown_steps()).
134138

139+
init_per_testcase(test_update_secret = TestCase, Config) ->
140+
rabbit_ct_helpers:testcase_started(Config, TestCase);
141+
142+
init_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ->
143+
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
144+
rabbit_ct_helpers:testcase_started(Config, TestCase);
145+
135146
init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
136147
ok = rabbit_ct_broker_helpers:rpc(Config,
137148
0,
@@ -142,6 +153,14 @@ init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config
142153
init_per_testcase(TestCase, Config) ->
143154
rabbit_ct_helpers:testcase_started(Config, TestCase).
144155

156+
end_per_testcase(test_update_secret = TestCase, Config) ->
157+
ok = rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"guest">>),
158+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
159+
160+
end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ->
161+
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
162+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
163+
145164
end_per_testcase(filtering_ff = TestCase, Config) ->
146165
_ = rabbit_ct_broker_helpers:rpc(Config,
147166
0,
@@ -221,6 +240,34 @@ test_stream(Config) ->
221240
test_server(gen_tcp, Stream, Config),
222241
ok.
223242

243+
test_update_secret(Config) ->
244+
Transport = gen_tcp,
245+
{S, C0} = connect_and_authenticate(Transport, Config),
246+
rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"password">>),
247+
C1 = expect_successful_authentication(
248+
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)),
249+
_C2 = test_close(Transport, S, C1),
250+
closed = wait_for_socket_close(Transport, S, 10),
251+
ok.
252+
253+
cannot_update_username_after_authenticated(Config) ->
254+
{S, C0} = connect_and_authenticate(gen_tcp, Config),
255+
C1 = expect_unsuccessful_authentication(
256+
try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>),
257+
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME),
258+
_C2 = test_close(gen_tcp, S, C1),
259+
closed = wait_for_socket_close(gen_tcp, S, 10),
260+
ok.
261+
262+
cannot_use_another_authmechanism_when_updating_secret(Config) ->
263+
{S, C0} = connect_and_authenticate(gen_tcp, Config),
264+
C1 = expect_unsuccessful_authentication(
265+
try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>),
266+
?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM),
267+
_C2 = test_close(gen_tcp, S, C1),
268+
closed = wait_for_socket_close(gen_tcp, S, 10),
269+
ok.
270+
224271
test_stream_tls(Config) ->
225272
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
226273
test_server(ssl, Stream, Config),
@@ -577,23 +624,43 @@ get_node_name(Config) ->
577624
get_node_name(Config, Node) ->
578625
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
579626

627+
get_port(Transport, Config) ->
628+
case Transport of
629+
gen_tcp ->
630+
get_stream_port(Config);
631+
ssl ->
632+
application:ensure_all_started(ssl),
633+
get_stream_port_tls(Config)
634+
end.
635+
get_opts(Transport) ->
636+
case Transport of
637+
gen_tcp ->
638+
[{active, false}, {mode, binary}];
639+
ssl ->
640+
[{active, false}, {mode, binary}, {verify, verify_none}]
641+
end.
642+
643+
connect_and_authenticate(Transport, Config) ->
644+
Port = get_port(Transport, Config),
645+
Opts = get_opts(Transport),
646+
{ok, S} = Transport:connect("localhost", Port, Opts),
647+
C0 = rabbit_stream_core:init(0),
648+
C1 = test_peer_properties(Transport, S, C0),
649+
{S, test_authenticate(Transport, S, C1)}.
650+
651+
try_authenticate(Transport, S, C, AuthMethod, Username, Password) ->
652+
case AuthMethod of
653+
<<"PLAIN">> ->
654+
plain_sasl_authenticate(Transport, S, C, Username, Password);
655+
_ ->
656+
Null = 0,
657+
sasl_authenticate(Transport, S, C, AuthMethod, <<Null:8, Username/binary, Null:8, Password/binary>>)
658+
end.
659+
580660
test_server(Transport, Stream, Config) ->
581661
QName = rabbit_misc:r(<<"/">>, queue, Stream),
582-
Port =
583-
case Transport of
584-
gen_tcp ->
585-
get_stream_port(Config);
586-
ssl ->
587-
application:ensure_all_started(ssl),
588-
get_stream_port_tls(Config)
589-
end,
590-
Opts =
591-
case Transport of
592-
gen_tcp ->
593-
[{active, false}, {mode, binary}];
594-
ssl ->
595-
[{active, false}, {mode, binary}, {verify, verify_none}]
596-
end,
662+
Port = get_port(Transport, Config),
663+
Opts = get_opts(Transport),
597664
{ok, S} =
598665
Transport:connect("localhost", Port, Opts),
599666
C0 = rabbit_stream_core:init(0),
@@ -652,6 +719,9 @@ test_peer_properties(Transport, S, C0) ->
652719
C.
653720

654721
test_authenticate(Transport, S, C0) ->
722+
tune(Transport, S, test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0))).
723+
724+
sasl_handshake(Transport, S, C0) ->
655725
SaslHandshakeFrame =
656726
rabbit_stream_core:frame({request, 1, sasl_handshake}),
657727
ok = Transport:send(S, SaslHandshakeFrame),
@@ -664,18 +734,33 @@ test_authenticate(Transport, S, C0) ->
664734
_ ->
665735
ct:fail("invalid cmd ~tp", [Cmd])
666736
end,
737+
C1.
738+
739+
test_plain_sasl_authenticate(Transport, S, C1) ->
740+
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1)).
667741

668-
Username = <<"guest">>,
669-
Password = <<"guest">>,
742+
plain_sasl_authenticate(Transport, S, C1) ->
743+
plain_sasl_authenticate(Transport, S, C1, <<"guest">>, <<"guest">>).
744+
745+
plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
670746
Null = 0,
671-
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
747+
sasl_authenticate(Transport, S, C1, <<"PLAIN">>, <<Null:8, Username/binary, Null:8, Password/binary>>).
748+
749+
expect_successful_authentication({SaslAuth, C2} = _SaslReponse) ->
750+
{response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}} = SaslAuth,
751+
C2.
752+
expect_unsuccessful_authentication({SaslAuth, C2} = _SaslReponse, ExpectedError) ->
753+
{response, 2, {sasl_authenticate, ExpectedError}} = SaslAuth,
754+
C2.
672755

756+
sasl_authenticate(Transport, S, C1, AuthMethod, AuthBody) ->
673757
SaslAuthenticateFrame =
674758
rabbit_stream_core:frame({request, 2,
675-
{sasl_authenticate, Plain, PlainSasl}}),
759+
{sasl_authenticate, AuthMethod, AuthBody}}),
676760
ok = Transport:send(S, SaslAuthenticateFrame),
677-
{SaslAuth, C2} = receive_commands(Transport, S, C1),
678-
{response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}} = SaslAuth,
761+
receive_commands(Transport, S, C1).
762+
763+
tune(Transport, S, C2) ->
679764
{Tune, C3} = receive_commands(Transport, S, C2),
680765
{tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,
681766

@@ -816,9 +901,9 @@ test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->
816901
C.
817902

818903
test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
819-
ct:pal("test_deliver ", []),
820904
{{deliver, SubscriptionId, Chunk}, C} =
821905
receive_commands(Transport, S, C0),
906+
ct:pal("test_deliver ~p", [Chunk]),
822907
<<5:4/unsigned,
823908
0:4/unsigned,
824909
0:8,
@@ -838,9 +923,9 @@ test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
838923
C.
839924

840925
test_deliver_v2(Transport, S, SubscriptionId, COffset, Body, C0) ->
841-
ct:pal("test_deliver ", []),
842926
{{deliver_v2, SubscriptionId, _CommittedOffset, Chunk}, C} =
843927
receive_commands(Transport, S, C0),
928+
ct:pal("test_deliver_v2 ~p", [Chunk]),
844929
<<5:4/unsigned,
845930
0:4/unsigned,
846931
0:8,

deps/rabbitmq_stream_common/include/rabbit_stream.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
-define(RESPONSE_CODE_PRECONDITION_FAILED, 17).
5353
-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 18).
5454
-define(RESPONSE_CODE_NO_OFFSET, 19).
55-
55+
-define(RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, 20).
56+
-define(RESPONSE_SASL_CANNOT_CHANGE_USERNAME, 21).
5657

5758
-define(OFFSET_TYPE_NONE, 0).
5859
-define(OFFSET_TYPE_FIRST, 1).

deps/rabbitmq_stream_common/src/rabbit_stream_core.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@
5858
?RESPONSE_CODE_ACCESS_REFUSED |
5959
?RESPONSE_CODE_PRECONDITION_FAILED |
6060
?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST |
61-
?RESPONSE_CODE_NO_OFFSET.
61+
?RESPONSE_CODE_NO_OFFSET |
62+
?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM |
63+
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME .
6264
-type error_code() :: response_code().
6365
-type sequence() :: non_neg_integer().
6466
-type credit() :: non_neg_integer().

0 commit comments

Comments
 (0)