|
158 | 158 | delivery_flow,
|
159 | 159 | interceptor_state,
|
160 | 160 | queue_states,
|
161 |
| - queue_cleanup_timer |
| 161 | + queue_cleanup_timer, |
| 162 | + %% Message content size limit |
| 163 | + max_message_size |
162 | 164 | }).
|
163 | 165 |
|
164 | 166 | -define(QUEUE, lqueue).
|
@@ -441,6 +443,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
441 | 443 | _ ->
|
442 | 444 | Limiter0
|
443 | 445 | end,
|
| 446 | + MaxMessageSize = case application:get_env(rabbit, max_message_size) of |
| 447 | + {ok, MS} when is_integer(MS) -> |
| 448 | + erlang:min(MS, ?MAX_MSG_SIZE); |
| 449 | + _ -> |
| 450 | + ?MAX_MSG_SIZE |
| 451 | + end, |
444 | 452 | State = #ch{state = starting,
|
445 | 453 | protocol = Protocol,
|
446 | 454 | channel = Channel,
|
@@ -473,7 +481,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
473 | 481 | reply_consumer = none,
|
474 | 482 | delivery_flow = Flow,
|
475 | 483 | interceptor_state = undefined,
|
476 |
| - queue_states = #{}}, |
| 484 | + queue_states = #{}, |
| 485 | + max_message_size = MaxMessageSize}, |
477 | 486 | State1 = State#ch{
|
478 | 487 | interceptor_state = rabbit_channel_interceptor:init(State)},
|
479 | 488 | State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
|
@@ -985,20 +994,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
|
985 | 994 | extract_topic_variable_map_from_amqp_params(_) ->
|
986 | 995 | #{}.
|
987 | 996 |
|
988 |
| -check_msg_size(Content) -> |
| 997 | +check_msg_size(Content, MaxMessageSize) -> |
989 | 998 | Size = rabbit_basic:maybe_gc_large_msg(Content),
|
990 |
| - case Size > ?MAX_MSG_SIZE of |
991 |
| - true -> precondition_failed("message size ~B larger than max size ~B", |
992 |
| - [Size, ?MAX_MSG_SIZE]); |
993 |
| - false -> |
994 |
| - case application:get_env(rabbit, max_message_size) of |
995 |
| - {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize -> |
996 |
| - precondition_failed("message size ~B larger than" |
997 |
| - " configured max size ~B", |
998 |
| - [Size, MaxSize]); |
999 |
| - |
1000 |
| - _ -> ok |
1001 |
| - end |
| 999 | + case Size of |
| 1000 | + S when S > MaxMessageSize -> |
| 1001 | + ErrorMessage = case MaxMessageSize of |
| 1002 | + ?MAX_MSG_SIZE -> |
| 1003 | + "message size ~B larger than max size ~B"; |
| 1004 | + _ -> |
| 1005 | + "message size ~B larger than configured max size ~B" |
| 1006 | + end, |
| 1007 | + precondition_failed(ErrorMessage, |
| 1008 | + [Size, MaxMessageSize]); |
| 1009 | + _ -> ok |
1002 | 1010 | end.
|
1003 | 1011 |
|
1004 | 1012 | check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
|
@@ -1172,16 +1180,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
|
1172 | 1180 | handle_method(#'basic.publish'{exchange = ExchangeNameBin,
|
1173 | 1181 | routing_key = RoutingKey,
|
1174 | 1182 | mandatory = Mandatory},
|
1175 |
| - Content, State = #ch{virtual_host = VHostPath, |
1176 |
| - tx = Tx, |
1177 |
| - channel = ChannelNum, |
1178 |
| - confirm_enabled = ConfirmEnabled, |
1179 |
| - trace_state = TraceState, |
1180 |
| - user = #user{username = Username} = User, |
1181 |
| - conn_name = ConnName, |
1182 |
| - delivery_flow = Flow, |
1183 |
| - conn_pid = ConnPid}) -> |
1184 |
| - check_msg_size(Content), |
| 1183 | + Content, State = #ch{virtual_host = VHostPath, |
| 1184 | + tx = Tx, |
| 1185 | + channel = ChannelNum, |
| 1186 | + confirm_enabled = ConfirmEnabled, |
| 1187 | + trace_state = TraceState, |
| 1188 | + user = #user{username = Username} = User, |
| 1189 | + conn_name = ConnName, |
| 1190 | + delivery_flow = Flow, |
| 1191 | + conn_pid = ConnPid, |
| 1192 | + max_message_size = MaxMessageSize}) -> |
| 1193 | + check_msg_size(Content, MaxMessageSize), |
1185 | 1194 | ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
|
1186 | 1195 | check_write_permitted(ExchangeName, User),
|
1187 | 1196 | Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
|
|
0 commit comments