Skip to content

Commit c9951ec

Browse files
committed
Close stream connection with delay in case of authentication failure
For consistency with other protocols (to protect from potential DoS attacks). Wrong credentials and virtual host access errors trigger the delay.
1 parent 8a17043 commit c9951ec

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
peer_cert_subject,
8080
peer_cert_validity]).
8181
-define(UNKNOWN_FIELD, unknown_field).
82+
-define(SILENT_CLOSE_DELAY, 3_000).
8283

8384
%% client API
8485
-export([start_link/4,
@@ -1325,6 +1326,7 @@ handle_frame_pre_auth(Transport,
13251326
stream),
13261327
auth_fail(Username, Msg, Args, C1, State),
13271328
rabbit_log_connection:warning(Msg, Args),
1329+
silent_close_delay(),
13281330
{C1#stream_connection{connection_step = failure},
13291331
{sasl_authenticate,
13301332
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
@@ -1490,6 +1492,7 @@ handle_frame_pre_auth(Transport,
14901492
Conn
14911493
catch exit:#amqp_error{explanation = Explanation} ->
14921494
rabbit_log:warning("Opening connection failed: ~ts", [Explanation]),
1495+
silent_close_delay(),
14931496
F = rabbit_stream_core:frame({response, CorrelationId,
14941497
{open,
14951498
?RESPONSE_VHOST_ACCESS_FAILURE,
@@ -4041,3 +4044,8 @@ stream_from_consumers(SubId, Consumers) ->
40414044
_ ->
40424045
undefined
40434046
end.
4047+
4048+
%% We don't trust the client at this point - force them to wait
4049+
%% for a bit so they can't DOS us with repeated failed logins etc.
4050+
silent_close_delay() ->
4051+
timer:sleep(?SILENT_CLOSE_DELAY).

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ groups() ->
6161
should_receive_metadata_update_after_update_secret,
6262
store_offset_requires_read_access,
6363
offset_lag_calculation,
64-
test_super_stream_duplicate_partitions
64+
test_super_stream_duplicate_partitions,
65+
authentication_error_should_close_with_delay,
66+
unauthorized_vhost_access_should_close_with_delay
6567
]},
6668
%% Run `test_global_counters` on its own so the global metrics are
6769
%% initialised to 0 for each testcase
@@ -173,6 +175,10 @@ init_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
173175
ok = rabbit_ct_broker_helpers:add_user(Config, <<"test">>),
174176
rabbit_ct_helpers:testcase_started(Config, TestCase);
175177

178+
init_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
179+
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
180+
rabbit_ct_helpers:testcase_started(Config, TestCase);
181+
176182
init_per_testcase(TestCase, Config) ->
177183
rabbit_ct_helpers:testcase_started(Config, TestCase).
178184

@@ -201,6 +207,9 @@ end_per_testcase(vhost_queue_limit = TestCase, Config) ->
201207
end_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
202208
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"test">>),
203209
rabbit_ct_helpers:testcase_finished(Config, TestCase);
210+
end_per_testcase(unauthorized_vhost_access_should_close_with_delay = TestCase, Config) ->
211+
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
212+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
204213
end_per_testcase(TestCase, Config) ->
205214
rabbit_ct_helpers:testcase_finished(Config, TestCase).
206215

@@ -890,6 +899,41 @@ offset_lag_calculation(Config) ->
890899

891900
ok.
892901

902+
authentication_error_should_close_with_delay(Config) ->
903+
T = gen_tcp,
904+
Port = get_port(T, Config),
905+
Opts = get_opts(T),
906+
{ok, S} = T:connect("localhost", Port, Opts),
907+
C0 = rabbit_stream_core:init(0),
908+
C1 = test_peer_properties(T, S, C0),
909+
Start = erlang:monotonic_time(millisecond),
910+
_ = expect_unsuccessful_authentication(
911+
try_authenticate(T, S, C1, <<"PLAIN">>, <<"guest">>, <<"wrong password">>),
912+
?RESPONSE_AUTHENTICATION_FAILURE),
913+
End = erlang:monotonic_time(millisecond),
914+
%% the stream reader module defines the delay (3 seconds)
915+
?assert(End - Start > 2_000),
916+
closed = wait_for_socket_close(T, S, 10),
917+
ok.
918+
919+
unauthorized_vhost_access_should_close_with_delay(Config) ->
920+
T = gen_tcp,
921+
Port = get_port(T, Config),
922+
Opts = get_opts(T),
923+
{ok, S} = T:connect("localhost", Port, Opts),
924+
C0 = rabbit_stream_core:init(0),
925+
C1 = test_peer_properties(T, S, C0),
926+
User = <<"other">>,
927+
C2 = test_plain_sasl_authenticate(T, S, sasl_handshake(T, S, C1), User),
928+
Start = erlang:monotonic_time(millisecond),
929+
R = do_tune(T, S, C2),
930+
?assertMatch({{response,_,{open,12}}, _}, R),
931+
End = erlang:monotonic_time(millisecond),
932+
%% the stream reader module defines the delay (3 seconds)
933+
?assert(End - Start > 2_000),
934+
closed = wait_for_socket_close(T, S, 10),
935+
ok.
936+
893937
consumer_offset_info(Config, ConnectionName) ->
894938
[[{offset, Offset},
895939
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
@@ -1093,12 +1137,15 @@ test_peer_properties(Transport, S, Properties, C0) ->
10931137
C.
10941138

10951139
test_authenticate(Transport, S, C0) ->
1096-
tune(Transport, S,
1097-
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
1140+
tune(Transport, S,
1141+
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), <<"guest">>)).
10981142

10991143
test_authenticate(Transport, S, C0, Username) ->
1100-
tune(Transport, S,
1101-
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), Username)).
1144+
test_authenticate(Transport, S, C0, Username, Username).
1145+
1146+
test_authenticate(Transport, S, C0, Username, Password) ->
1147+
tune(Transport, S,
1148+
test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0), Username, Password)).
11021149

11031150
sasl_handshake(Transport, S, C0) ->
11041151
SaslHandshakeFrame = request(sasl_handshake),
@@ -1115,7 +1162,10 @@ sasl_handshake(Transport, S, C0) ->
11151162
C1.
11161163

11171164
test_plain_sasl_authenticate(Transport, S, C1, Username) ->
1118-
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1, Username, Username)).
1165+
test_plain_sasl_authenticate(Transport, S, C1, Username, Username).
1166+
1167+
test_plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
1168+
expect_successful_authentication(plain_sasl_authenticate(Transport, S, C1, Username, Password)).
11191169

11201170
plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
11211171
Null = 0,
@@ -1136,6 +1186,10 @@ sasl_authenticate(Transport, S, C1, AuthMethod, AuthBody) ->
11361186
receive_commands(Transport, S, C1).
11371187

11381188
tune(Transport, S, C2) ->
1189+
{{response, _, {open, ?RESPONSE_CODE_OK, _}}, C3} = do_tune(Transport, S, C2),
1190+
C3.
1191+
1192+
do_tune(Transport, S, C2) ->
11391193
{Tune, C3} = receive_commands(Transport, S, C2),
11401194
{tune, ?DEFAULT_FRAME_MAX, ?DEFAULT_HEARTBEAT} = Tune,
11411195

@@ -1147,10 +1201,7 @@ tune(Transport, S, C2) ->
11471201
VirtualHost = <<"/">>,
11481202
OpenFrame = request(3, {open, VirtualHost}),
11491203
ok = Transport:send(S, OpenFrame),
1150-
{{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}},
1151-
C4} =
1152-
receive_commands(Transport, S, C3),
1153-
C4.
1204+
receive_commands(Transport, S, C3).
11541205

11551206
test_create_stream(Transport, S, Stream, C0) ->
11561207
CreateStreamFrame = request({create_stream, Stream, #{}}),

0 commit comments

Comments
 (0)