Skip to content

Commit be0d287

Browse files
authored
Merge pull request #1812 from rabbitmq/max_msg_size
Introduce a configurable limit to message size
2 parents 3e35eda + b1b995c commit be0d287

File tree

4 files changed

+127
-20
lines changed

4 files changed

+127
-20
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ define PROJECT_ENV
130130
{vhost_restart_strategy, continue},
131131
%% {global, prefetch count}
132132
{default_consumer_prefetch, {false, 0}},
133-
{channel_queue_cleanup_interval, 60000}
133+
{channel_queue_cleanup_interval, 60000},
134+
%% Default max message size is 128 MB
135+
{max_message_size, 134217728}
134136
]
135137
endef
136138

priv/schema/rabbit.schema

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,9 @@ end}.
554554
}.
555555

556556

557+
{mapping, "msx_message_size", "rabbit.max_message_size",
558+
[{datatype, integer}, {validators, ["less_then_512MB"]}]}.
559+
557560
%% Customising Socket Options.
558561
%%
559562
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) ->
13611364
Size > 0 andalso Size < 2147483648
13621365
end}.
13631366

1367+
{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
1368+
fun(Size) when is_integer(Size) ->
1369+
Size > 0 andalso Size < 536870912
1370+
end}.
1371+
13641372
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
13651373
fun(Float) when is_float(Float) ->
13661374
Float > 0 andalso Float < 1

src/rabbit_channel.erl

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
-export([get_vhost/1, get_user/1]).
7373
%% For testing
7474
-export([build_topic_variable_map/3]).
75-
-export([list_queue_states/1]).
75+
-export([list_queue_states/1, get_max_message_size/0]).
7676

7777
%% Mgmt HTTP API refactor
7878
-export([handle_method/5]).
@@ -158,7 +158,9 @@
158158
delivery_flow,
159159
interceptor_state,
160160
queue_states,
161-
queue_cleanup_timer
161+
queue_cleanup_timer,
162+
%% Message content size limit
163+
max_message_size
162164
}).
163165

164166
-define(QUEUE, lqueue).
@@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
441443
_ ->
442444
Limiter0
443445
end,
446+
MaxMessageSize = get_max_message_size(),
444447
State = #ch{state = starting,
445448
protocol = Protocol,
446449
channel = Channel,
@@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
473476
reply_consumer = none,
474477
delivery_flow = Flow,
475478
interceptor_state = undefined,
476-
queue_states = #{}},
479+
queue_states = #{},
480+
max_message_size = MaxMessageSize},
477481
State1 = State#ch{
478482
interceptor_state = rabbit_channel_interceptor:init(State)},
479483
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) ->
793797

794798
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
795799

800+
-spec get_max_message_size() -> non_neg_integer().
801+
802+
get_max_message_size() ->
803+
case application:get_env(rabbit, max_message_size) of
804+
{ok, MS} when is_integer(MS) ->
805+
erlang:min(MS, ?MAX_MSG_SIZE);
806+
_ ->
807+
?MAX_MSG_SIZE
808+
end.
809+
796810
%%---------------------------------------------------------------------------
797811

798812
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
985999
extract_topic_variable_map_from_amqp_params(_) ->
9861000
#{}.
9871001

988-
check_msg_size(Content) ->
1002+
check_msg_size(Content, MaxMessageSize) ->
9891003
Size = rabbit_basic:maybe_gc_large_msg(Content),
990-
case Size > ?MAX_MSG_SIZE of
991-
true -> precondition_failed("message size ~B larger than max size ~B",
992-
[Size, ?MAX_MSG_SIZE]);
993-
false -> ok
1004+
case Size of
1005+
S when S > MaxMessageSize ->
1006+
ErrorMessage = case MaxMessageSize of
1007+
?MAX_MSG_SIZE ->
1008+
"message size ~B is larger than max size ~B";
1009+
_ ->
1010+
"message size ~B is larger than configured max size ~B"
1011+
end,
1012+
precondition_failed(ErrorMessage,
1013+
[Size, MaxMessageSize]);
1014+
_ -> ok
9941015
end.
9951016

9961017
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
11641185
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11651186
routing_key = RoutingKey,
11661187
mandatory = Mandatory},
1167-
Content, State = #ch{virtual_host = VHostPath,
1168-
tx = Tx,
1169-
channel = ChannelNum,
1170-
confirm_enabled = ConfirmEnabled,
1171-
trace_state = TraceState,
1172-
user = #user{username = Username} = User,
1173-
conn_name = ConnName,
1174-
delivery_flow = Flow,
1175-
conn_pid = ConnPid}) ->
1176-
check_msg_size(Content),
1188+
Content, State = #ch{virtual_host = VHostPath,
1189+
tx = Tx,
1190+
channel = ChannelNum,
1191+
confirm_enabled = ConfirmEnabled,
1192+
trace_state = TraceState,
1193+
user = #user{username = Username} = User,
1194+
conn_name = ConnName,
1195+
delivery_flow = Flow,
1196+
conn_pid = ConnPid,
1197+
max_message_size = MaxMessageSize}) ->
1198+
check_msg_size(Content, MaxMessageSize),
11771199
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
11781200
check_write_permitted(ExchangeName, User),
11791201
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),

test/unit_inbroker_parallel_SUITE.erl

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
-define(TIMEOUT_LIST_OPS_PASS, 5000).
2727
-define(TIMEOUT, 30000).
28+
-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
2829

2930
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
3031

@@ -60,10 +61,16 @@ groups() ->
6061
topic_matching,
6162
{queue_max_length, [], [
6263
{max_length_simple, [], MaxLengthTests},
63-
{max_length_mirrored, [], MaxLengthTests}]}
64+
{max_length_mirrored, [], MaxLengthTests}]},
65+
max_message_size
6466
]}
6567
].
6668

69+
suite() ->
70+
[
71+
{timetrap, {seconds, 30}}
72+
].
73+
6774
%% -------------------------------------------------------------------
6875
%% Testsuite setup/teardown.
6976
%% -------------------------------------------------------------------
@@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
12991306
_ -> ok
13001307
end.
13011308

1309+
gen_binary_mb(N) ->
1310+
B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
1311+
<< B1M || _ <- lists:seq(1, N) >>.
1312+
1313+
assert_channel_alive(Ch) ->
1314+
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
1315+
#amqp_msg{payload = <<"HI">>}).
1316+
1317+
assert_channel_fail_max_size(Ch, Monitor) ->
1318+
receive
1319+
{'DOWN', Monitor, process, Ch,
1320+
{shutdown,
1321+
{server_initiated_close, 406, _Error}}} ->
1322+
ok
1323+
after ?TIMEOUT_CHANNEL_EXCEPTION ->
1324+
error({channel_exception_expected, max_message_size})
1325+
end.
1326+
1327+
max_message_size(Config) ->
1328+
Binary2M = gen_binary_mb(2),
1329+
Binary4M = gen_binary_mb(4),
1330+
Binary6M = gen_binary_mb(6),
1331+
Binary10M = gen_binary_mb(10),
1332+
1333+
Size2Mb = 1024 * 1024 * 2,
1334+
Size2Mb = byte_size(Binary2M),
1335+
1336+
rabbit_ct_broker_helpers:rpc(Config, 0,
1337+
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
1338+
1339+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1340+
1341+
%% Binary is whithin the max size limit
1342+
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
1343+
%% The channel process is alive
1344+
assert_channel_alive(Ch),
1345+
1346+
Monitor = monitor(process, Ch),
1347+
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
1348+
assert_channel_fail_max_size(Ch, Monitor),
1349+
1350+
%% increase the limit
1351+
rabbit_ct_broker_helpers:rpc(Config, 0,
1352+
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
1353+
1354+
{_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
1355+
1356+
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
1357+
assert_channel_alive(Ch1),
1358+
1359+
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
1360+
assert_channel_alive(Ch1),
1361+
1362+
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
1363+
assert_channel_alive(Ch1),
1364+
1365+
Monitor1 = monitor(process, Ch1),
1366+
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
1367+
assert_channel_fail_max_size(Ch1, Monitor1),
1368+
1369+
%% increase beyond the hard limit
1370+
rabbit_ct_broker_helpers:rpc(Config, 0,
1371+
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
1372+
Val = rabbit_ct_broker_helpers:rpc(Config, 0,
1373+
rabbit_channel, get_max_message_size, []),
1374+
1375+
?assertEqual(?MAX_MSG_SIZE, Val).
1376+
13021377
%% ---------------------------------------------------------------------------
13031378
%% rabbitmqctl helpers.
13041379
%% ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)