Skip to content

Commit d1b9595

Browse files
committed
Store incoming max_message_size in #incoming_link{}
This keeps functions pure and ensures that existing links do not break if an operator were to dynamically change the server's max_message_size. Each link now has a max_message_size: * incoming links as determined by RabbitMQ config * outgoing links as determined by the client (cherry picked from commit 28bd6d4)
1 parent fb1e8be commit d1b9595

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
routing_key :: rabbit_types:routing_key() | to | subject,
144144
%% queue_name_bin is only set if the link target address refers to a queue.
145145
queue_name_bin :: undefined | rabbit_misc:resource_name(),
146+
max_message_size :: pos_integer(),
146147
delivery_count :: sequence_no(),
147148
credit :: rabbit_queue_type:credit(),
148149
%% TRANSFER delivery IDs published to queues but not yet confirmed by queues
@@ -999,10 +1000,12 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
9991000
ok = validate_attach(Attach),
10001001
case ensure_target(Target, Vhost, User, PermCache0) of
10011002
{ok, Exchange, RoutingKey, QNameBin, PermCache} ->
1003+
MaxMessageSize = persistent_term:get(max_message_size),
10021004
IncomingLink = #incoming_link{
10031005
exchange = Exchange,
10041006
routing_key = RoutingKey,
10051007
queue_name_bin = QNameBin,
1008+
max_message_size = MaxMessageSize,
10061009
delivery_count = DeliveryCountInt,
10071010
credit = MaxLinkCredit},
10081011
_Outcomes = outcomes(Source),
@@ -1015,7 +1018,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
10151018
target = Target,
10161019
%% We are the receiver.
10171020
role = ?AMQP_ROLE_RECEIVER,
1018-
max_message_size = {ulong, persistent_term:get(max_message_size)}},
1021+
max_message_size = {ulong, MaxMessageSize}},
10191022
Flow = #'v1_0.flow'{handle = Handle,
10201023
delivery_count = DeliveryCount,
10211024
link_credit = ?UINT(MaxLinkCredit)},
@@ -2248,6 +2251,7 @@ incoming_link_transfer(
22482251
settled = Settled},
22492252
MsgPart,
22502253
Link0 = #incoming_link{
2254+
max_message_size = MaxMessageSize,
22512255
multi_transfer_msg = Multi = #multi_transfer_msg{
22522256
payload_fragments_rev = PFR0,
22532257
delivery_id = FirstDeliveryId,
@@ -2257,7 +2261,7 @@ incoming_link_transfer(
22572261
validate_multi_transfer_delivery_id(DeliveryId, FirstDeliveryId),
22582262
validate_multi_transfer_settled(Settled, FirstSettled),
22592263
PFR = [MsgPart | PFR0],
2260-
validate_incoming_message_size(PFR),
2264+
validate_message_size(PFR, MaxMessageSize),
22612265
Link = Link0#incoming_link{multi_transfer_msg = Multi#multi_transfer_msg{payload_fragments_rev = PFR}},
22622266
{ok, [], Link, State};
22632267
incoming_link_transfer(
@@ -2277,6 +2281,7 @@ incoming_link_transfer(
22772281
MsgPart,
22782282
#incoming_link{exchange = LinkExchange,
22792283
routing_key = LinkRKey,
2284+
max_message_size = MaxMessageSize,
22802285
delivery_count = DeliveryCount0,
22812286
incoming_unconfirmed_map = U0,
22822287
credit = Credit0,
@@ -2306,7 +2311,7 @@ incoming_link_transfer(
23062311
{MsgBin0, FirstDeliveryId, FirstSettled}
23072312
end,
23082313
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
2309-
validate_incoming_message_size(PayloadBin),
2314+
validate_message_size(PayloadBin, MaxMessageSize),
23102315

23112316
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23122317
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
@@ -3034,9 +3039,6 @@ validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled =
30343039
validate_transfer_rcv_settle_mode(_, _) ->
30353040
ok.
30363041

3037-
validate_incoming_message_size(Message) ->
3038-
validate_message_size(Message, persistent_term:get(max_message_size)).
3039-
30403042
validate_message_size(_, unlimited) ->
30413043
ok;
30423044
validate_message_size(Message, MaxMsgSize)
@@ -3050,7 +3052,7 @@ validate_message_size(Message, MaxMsgSize)
30503052
%% We apply that sentence to both incoming messages that are too large for us and outgoing messages that are
30513053
%% too large for the client.
30523054
%% This is an interesting protocol difference to MQTT where we instead discard outgoing messages that are too
3053-
%% large to send then behave as if we had completed sending that message [MQTT 5.0, MQTT-3.1.2-25].
3055+
%% large to send and then behave as if we had completed sending that message [MQTT 5.0, MQTT-3.1.2-25].
30543056
protocol_error(
30553057
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
30563058
"message size (~b bytes) > maximum message size (~b bytes)",

0 commit comments

Comments
 (0)