Skip to content

Commit 659f0b6

Browse files
SimonUngemichaelklishin
authored andcommitted
Add channel limit per node
1 parent 7f25af1 commit 659f0b6

File tree

4 files changed

+112
-23
lines changed

4 files changed

+112
-23
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,9 @@ end}.
889889

890890
{mapping, "channel_max", "rabbit.channel_max", [{datatype, integer}]}.
891891

892+
{mapping, "channel_max_per_node", "rabbit.channel_max_per_node",
893+
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
894+
892895
%% Set the max permissible number of client connections per node.
893896
%% `infinity` means "no limit".
894897
%%

deps/rabbit/src/rabbit_reader.erl

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ create_channel(Channel,
904904
capabilities = Capabilities,
905905
user = #user{username = Username} = User}
906906
} = State) ->
907-
case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
907+
case is_over_limits(Username) of
908908
false ->
909909
{ok, _ChSupPid, {ChPid, AState}} =
910910
rabbit_channel_sup_sup:start_channel(
@@ -915,11 +915,45 @@ create_channel(Channel,
915915
put({ch_pid, ChPid}, {Channel, MRef}),
916916
put({channel, Channel}, {ChPid, AState}),
917917
{ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
918+
{true, Limit, Fmt} ->
919+
{error, rabbit_misc:amqp_error(
920+
not_allowed,
921+
Fmt,
922+
[node(), Limit], 'none')}
923+
end.
924+
925+
is_over_limits(Username) ->
926+
case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
927+
false ->
928+
case is_over_node_channel_limit() of
929+
false ->
930+
false;
931+
{true, Limit} ->
932+
Fmt =
933+
"number of channels opened for node '~ts' has reached "
934+
"the maximum allowed limit of (~w)",
935+
{true, Limit, Fmt}
936+
end;
918937
{true, Limit} ->
919-
{error, rabbit_misc:amqp_error(not_allowed,
920-
"number of channels opened for user '~ts' has reached "
921-
"the maximum allowed user limit of (~w)",
922-
[Username, Limit], 'none')}
938+
Fmt =
939+
"number of channels opened for user '~ts' has reached "
940+
"the maximum allowed user limit of (~w)",
941+
{true, Limit, Fmt}
942+
end.
943+
944+
is_over_node_channel_limit() ->
945+
case rabbit_misc:get_env(rabbit, channel_max_per_node, 0) of
946+
0 ->
947+
false;
948+
NodeLimit ->
949+
%% Only fetch this if a limit is set
950+
CurrNodeChannels = length(rabbit_channel_tracking:list_on_node(node())),
951+
case CurrNodeChannels < NodeLimit of
952+
true ->
953+
false;
954+
false ->
955+
{true, NodeLimit}
956+
end
923957
end.
924958

925959
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,10 @@ tcp_listen_options.exit_on_close = false",
396396
"channel_max = 16",
397397
[{rabbit,[{channel_max, 16}]}],
398398
[]},
399+
{channel_max_per_node,
400+
"channel_max_per_node = 16",
401+
[{rabbit,[{channel_max_per_node, 16}]}],
402+
[]},
399403
{max_message_size,
400404
"max_message_size = 131072",
401405
[{rabbit, [{max_message_size, 131072}]}],

deps/rabbit/test/per_node_limit_SUITE.erl

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515

1616
all() ->
1717
[
18-
{group, parallel_tests}
18+
{group, limit_tests}
1919
].
2020

2121
groups() ->
2222
[
23-
{parallel_tests, [parallel], [
24-
node_connection_limit,
25-
vhost_limit
26-
]}
23+
{limit_tests, [], [
24+
node_connection_limit,
25+
vhost_limit,
26+
node_channel_limit
27+
]}
2728
].
2829

2930
suite() ->
@@ -60,9 +61,15 @@ init_per_testcase(Testcase, Config) ->
6061
rabbit_ct_helpers:testcase_started(Config, Testcase).
6162

6263
end_per_testcase(vhost_limit = Testcase, Config) ->
64+
set_node_limit(Config, vhost_max, infinity),
65+
set_node_limit(Config, channel_max_per_node, 0),
66+
set_node_limit(Config, connection_max, infinity),
6367
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
6468
rabbit_ct_helpers:testcase_finished(Config, Testcase);
6569
end_per_testcase(Testcase, Config) ->
70+
set_node_limit(Config, vhost_max, infinity),
71+
set_node_limit(Config, channel_max_per_node, 0),
72+
set_node_limit(Config, connection_max, infinity),
6673
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6774

6875
%% -------------------------------------------------------------------
@@ -71,7 +78,7 @@ end_per_testcase(Testcase, Config) ->
7178

7279
node_connection_limit(Config) ->
7380
%% Set limit to 0, don't accept any connections
74-
set_node_limit(Config, 0),
81+
set_node_limit(Config, connection_max, 0),
7582
{error, not_allowed} = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
7683

7784
%% Set limit to 5, accept 5 connections
@@ -80,47 +87,88 @@ node_connection_limit(Config) ->
8087
{error, not_allowed} = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
8188
close_all_connections(Connections),
8289

83-
set_node_limit(Config, infinity),
90+
set_node_limit(Config, connection_max, infinity),
8491
C = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
8592
true = is_pid(C),
8693
close_all_connections([C]),
8794
ok.
8895

8996
vhost_limit(Config) ->
90-
set_vhost_limit(Config, 0),
97+
set_node_limit(Config, vhost_max, 0),
9198
{'EXIT',{vhost_limit_exceeded, _}} = rabbit_ct_broker_helpers:add_vhost(Config, <<"foo">>),
9299

93-
set_vhost_limit(Config, 5),
100+
set_node_limit(Config, vhost_max, 5),
94101
[ok = rabbit_ct_broker_helpers:add_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
95102
{'EXIT',{vhost_limit_exceeded, _}} = rabbit_ct_broker_helpers:add_vhost(Config, <<"5">>),
96103
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
97104

98-
set_vhost_limit(Config, infinity),
105+
set_node_limit(Config, vhost_max, infinity),
99106
[ok = rabbit_ct_broker_helpers:add_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
100107
ok = rabbit_ct_broker_helpers:add_vhost(Config, <<"5">>),
101108
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,5)],
102109
ok.
103110

111+
node_channel_limit(Config) ->
112+
set_node_limit(Config, channel_max_per_node, 5),
113+
114+
VHost = <<"foobar">>,
115+
User = <<"guest">>,
116+
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
117+
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
118+
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
119+
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
120+
121+
lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
122+
(_) -> {ok,_ } = open_channel(Conn2)
123+
end, lists:seq(1, 5)),
124+
125+
5 = count_channels_per_node(Config),
126+
%% In total 5 channels are open on this node, so a new one, regardless of
127+
%% connection, will not be allowed. It will terminate the connection with
128+
%% its channels too. So
129+
{error, not_allowed_crash} = open_channel(Conn2),
130+
3 = count_channels_per_node(Config),
131+
%% As the connection is dead, so are the 2 channels, so we should be able to
132+
%% create 2 more on Conn1
133+
{ok , _} = open_channel(Conn1),
134+
{ok , _} = open_channel(Conn1),
135+
%% But not a third
136+
{error, not_allowed_crash} = open_channel(Conn1),
137+
138+
%% Now all connections are closed, so there should be 0 open connections
139+
0 = count_channels_per_node(Config),
140+
ok.
104141

105142
%% -------------------------------------------------------------------
106143
%% Implementation
107144
%% -------------------------------------------------------------------
108145

109146
open_connections_to_limit(Config, Limit) ->
110-
set_node_limit(Config, Limit),
147+
set_node_limit(Config, connection_max, Limit),
111148
Connections = [rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0) || _ <- lists:seq(1,Limit)],
112149
true = lists:all(fun(E) -> is_pid(E) end, Connections),
113150
Connections.
114151

115152
close_all_connections(Connections) ->
116153
[rabbit_ct_client_helpers:close_connection(C) || C <- Connections].
117154

118-
set_node_limit(Config, Limit) ->
119-
rabbit_ct_broker_helpers:rpc(Config, 0,
120-
application,
121-
set_env, [rabbit, connection_max, Limit]).
122-
123-
set_vhost_limit(Config, Limit) ->
155+
set_node_limit(Config, Type, Limit) ->
124156
rabbit_ct_broker_helpers:rpc(Config, 0,
125157
application,
126-
set_env, [rabbit, vhost_max, Limit]).
158+
set_env, [rabbit, Type, Limit]).
159+
160+
open_channel(Conn) when is_pid(Conn) ->
161+
try amqp_connection:open_channel(Conn) of
162+
{ok, Ch} -> {ok, Ch};
163+
{error, _} ->
164+
{error, not_allowed}
165+
catch
166+
_:_Error -> {error, not_allowed_crash}
167+
end.
168+
169+
count_channels_per_node(Config) ->
170+
NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 0),
171+
length(rabbit_ct_broker_helpers:rpc(Config, 0,
172+
rabbit_channel_tracking,
173+
list_on_node,
174+
[?config(nodename, NodeConfig)])).

0 commit comments

Comments
 (0)