Skip to content

Commit be406d8

Browse files
Merge pull request #11832 from rabbitmq/mergify/bp/v4.0.x/pr-11831
Close stream connection with delay in case of authentication failure (backport #11831)
2 parents 19c4ac5 + a0f7589 commit be406d8

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
peer_cert_subject,
8080
peer_cert_validity]).
8181
-define(UNKNOWN_FIELD, unknown_field).
82+
-define(SILENT_CLOSE_DELAY, 3_000).
8283

8384
%% client API
8485
-export([start_link/4,
@@ -1325,6 +1326,7 @@ handle_frame_pre_auth(Transport,
13251326
stream),
13261327
auth_fail(Username, Msg, Args, C1, State),
13271328
rabbit_log_connection:warning(Msg, Args),
1329+
silent_close_delay(),
13281330
{C1#stream_connection{connection_step = failure},
13291331
{sasl_authenticate,
13301332
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
@@ -1490,6 +1492,7 @@ handle_frame_pre_auth(Transport,
14901492
Conn
14911493
catch exit:#amqp_error{explanation = Explanation} ->
14921494
rabbit_log:warning("Opening connection failed: ~ts", [Explanation]),
1495+
silent_close_delay(),
14931496
F = rabbit_stream_core:frame({response, CorrelationId,
14941497
{open,
14951498
?RESPONSE_VHOST_ACCESS_FAILURE,
@@ -4041,3 +4044,8 @@ stream_from_consumers(SubId, Consumers) ->
40414044
_ ->
40424045
undefined
40434046
end.
4047+
4048+
%% We don't trust the client at this point - force them to wait
4049+
%% for a bit so they can't DOS us with repeated failed logins etc.
4050+
silent_close_delay() ->
4051+
timer:sleep(?SILENT_CLOSE_DELAY).

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ groups() ->
6161
should_receive_metadata_update_after_update_secret,
6262
store_offset_requires_read_access,
6363
offset_lag_calculation,
64-
test_super_stream_duplicate_partitions
64+
test_super_stream_duplicate_partitions,
65+
authentication_error_should_close_with_delay,
66+
unauthorized_vhost_access_should_close_with_delay
6567
]},
6668
%% Run `test_global_counters` on its own so the global metrics are
6769
%% initialised to 0 for each testcase
@@ -173,6 +175,10 @@ init_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
173175
ok = rabbit_ct_broker_helpers:add_user(Config, <<"test">>),
174176
rabbit_ct_helpers:testcase_started(Config, TestCase);
175177

178+
init_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
179+
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
180+
rabbit_ct_helpers:testcase_started(Config, TestCase);
181+
176182
init_per_testcase(TestCase, Config) ->
177183
rabbit_ct_helpers:testcase_started(Config, TestCase).
178184

@@ -201,6 +207,9 @@ end_per_testcase(vhost_queue_limit = TestCase, Config) ->
201207
end_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
202208
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"test">>),
203209
rabbit_ct_helpers:testcase_finished(Config, TestCase);
210+
end_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
211+
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
212+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
204213
end_per_testcase(TestCase, Config) ->
205214
rabbit_ct_helpers:testcase_finished(Config, TestCase).
206215

@@ -890,6 +899,41 @@ offset_lag_calculation(Config) ->
890899

891900
ok.
892901

902+
authentication_error_should_close_with_delay(Config) ->
903+
T = gen_tcp,
904+
Port = get_port(T, Config),
905+
Opts = get_opts(T),
906+
{ok, S} = T:connect("localhost", Port, Opts),
907+
C0 = rabbit_stream_core:init(0),
908+
C1 = test_peer_properties(T, S, C0),
909+
Start = erlang:monotonic_time(millisecond),
910+
_ = expect_unsuccessful_authentication(
911+
try_authenticate(T, S, C1, <<"PLAIN">>, <<"guest">>, <<"wrong password">>),
912+
?RESPONSE_AUTHENTICATION_FAILURE),
913+
End = erlang:monotonic_time(millisecond),
914+
%% the stream reader module defines the delay (3 seconds)
915+
?assert(End - Start > 2_000),
916+
closed = wait_for_socket_close(T, S, 10),
917+
ok.
918+
919+
unauthorized_vhost_access_should_close_with_delay(Config) ->
920+
T = gen_tcp,
921+
Port = get_port(T, Config),
922+
Opts = get_opts(T),
923+
{ok, S} = T:connect("localhost", Port, Opts),
924+
C0 = rabbit_stream_core:init(0),
925+
C1 = test_peer_properties(T, S, C0),
926+
User = <<"other">>,
927+
C2 = test_plain_sasl_authenticate(T, S, sasl_handshake(T, S, C1), User),
928+
Start = erlang:monotonic_time(millisecond),
929+
R = do_tune(T, S, C2),
930+
?assertMatch({{response,_,{open,12}}, _}, R),
931+
End = erlang:monotonic_time(millisecond),
932+
%% the stream reader module defines the delay (3 seconds)
933+
?assert(End - Start > 2_000),
934+
closed = wait_for_socket_close(T, S, 10),
935+
ok.
936+
893937
consumer_offset_info(Config, ConnectionName) ->
894938
[[{offset, Offset},
895939
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
@@ -1093,12 +1137,15 @@ test_peer_properties(Transport, S, Properties, C0) ->
10931137
C.
10941138

10951139
test_authenticate(Transport, S, C0) ->
1096-
tune(Transport, S,
1097-
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
1140+
tune(Transport, S,
1141+
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
10981142

10991143
test_authenticate(Transport, S, C0, Username) ->
1100-
tune(Transport, S,
1101-
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), Username)).
1144+
test_authenticate(Transport, S, C0, Username, Username).
1145+
1146+
test_authenticate(Transport, S, C0, Username, Password) ->
1147+
tune(Transport, S,
1148+
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), Username, Password)).
11021149

11031150
sasl_handshake(Transport, S, C0) ->
11041151
SaslHandshakeFrame = request(sasl_handshake),
@@ -1115,7 +1162,10 @@ sasl_handshake(Transport, S, C0) ->
11151162
C1.
11161163

11171164
test_plain_sasl_authenticate(Transport, S, C1, Username) ->
1118-
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1, Username, Username)).
1165+
test_plain_sasl_authenticate(Transport, S, C1, Username, Username).
1166+
1167+
test_plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
1168+
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1, Username, Password)).
11191169

11201170
plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
11211171
Null = 0,
@@ -1136,6 +1186,10 @@ sasl_authenticate(Transport, S, C1, AuthMethod, AuthBody) ->
11361186
receive_commands(Transport, S, C1).
11371187

11381188
tune(Transport, S, C2) ->
1189+
{{response, _, {open, ?RESPONSE_CODE_OK, _}}, C3} = do_tune(Transport, S, C2),
1190+
C3.
1191+
1192+
do_tune(Transport, S, C2) ->
11391193
{Tune, C3} = receive_commands(Transport, S, C2),
11401194
{tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,
11411195

@@ -1147,10 +1201,7 @@ tune(Transport, S, C2) ->
11471201
VirtualHost = <<"/">>,
11481202
OpenFrame = request(3, {open, VirtualHost}),
11491203
ok = Transport:send(S, OpenFrame),
1150-
{{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}},
1151-
C4} =
1152-
receive_commands(Transport, S, C3),
1153-
C4.
1204+
receive_commands(Transport, S, C3).
11541205

11551206
test_create_stream(Transport, S, Stream, C0) ->
11561207
CreateStreamFrame = request({create_stream, Stream, #{}}),

0 commit comments

Comments
 (0)