Skip to content

Commit 3d67842

Browse files
committed
Protect receiving app from being overloaded
What? Protect receiving application from being overloaded with new messages while still processing existing messages if the auto credit renewal feature of the Erlang AMQP 1.0 client library is used. This feature can therefore be thought of as a prefetch window equivalent in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum. How? The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented. This commit takes the same approach as done in the server: The incoming_unsettled map is hold in the link instead of in the session to accurately and quickly determine the number of unsettled messages for a receiving link. The amqp10_client lib will grant more credits to the sender when the sum of remaining link credits and number of unsettled deliveries falls below the threshold RenewWhenBelow. This avoids maintaning additional state like the `link_credit_unsettled` or an alternative delivery_count_settled sequence number which is more complex to implement correctly. This commit breaks the amqp10_client_session:disposition/6 API: This commit forces the client application to only range settle for a given link, i.e. not across multiple links on a given session at once. The latter is allowed according to the AMQP spec.
1 parent d31fdb9 commit 3d67842

File tree

10 files changed

+315
-193
lines changed

10 files changed

+315
-193
lines changed

deps/amqp10_client/Makefile

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@ include erlang.mk
5151
HEX_TARBALL_FILES += rabbitmq-components.mk \
5252
git-revisions.txt
5353

54-
# Dialyze the tests.
55-
DIALYZER_OPTS += --src -r test
56-
5754
# --------------------------------------------------------------------
5855
# ActiveMQ for the testsuite.
5956
# --------------------------------------------------------------------

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -301,16 +301,19 @@ attach_link(Session, AttachArgs) ->
301301
%% This is asynchronous and will notify completion of the attach request to the
302302
%% caller using an amqp10_event of the following format:
303303
%% {amqp10_event, {link, LinkRef, {detached, Why}}}
304-
-spec detach_link(link_ref()) -> _.
304+
-spec detach_link(link_ref()) -> ok | {error, term()}.
305305
detach_link(#link_ref{link_handle = Handle, session = Session}) ->
306306
amqp10_client_session:detach(Session, Handle).
307307

308-
%% @doc Grant credit to a sender.
309-
%% The amqp10_client will automatically grant Credit to the sender when
310-
%% the remaining link credit falls below the value of RenewWhenBelow.
311-
%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead
312-
%% the caller will be notified when the link_credit reaches 0 with an
313-
%% amqp10_event of the following format:
308+
%% @doc Grant Credit to a sender.
309+
%%
310+
%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
311+
%% Credit to the sender when the sum of the remaining link credit and the number of unsettled
312+
%% messages falls below the value of RenewWhenBelow.
313+
%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages.
314+
%%
315+
%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller
316+
%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format:
314317
%% {amqp10_event, {link, LinkRef, credit_exhausted}}
315318
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
316319
RenewWhenBelow :: never | pos_integer()) -> ok.
@@ -323,10 +326,16 @@ flow_link_credit(Ref, Credit, RenewWhenBelow) ->
323326
flow_link_credit(#link_ref{role = receiver, session = Session,
324327
link_handle = Handle},
325328
Credit, RenewWhenBelow, Drain)
326-
when RenewWhenBelow =:= never orelse
329+
when
330+
%% Drain together with auto renewal doesn't make sense, so disallow it in the API.
331+
((Drain) andalso RenewWhenBelow =:= never
332+
orelse not(Drain))
333+
andalso
334+
%% Check that the RenewWhenBelow value make sense.
335+
(RenewWhenBelow =:= never orelse
327336
is_integer(RenewWhenBelow) andalso
328337
RenewWhenBelow > 0 andalso
329-
RenewWhenBelow =< Credit ->
338+
RenewWhenBelow =< Credit) ->
330339
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
331340
drain = Drain},
332341
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
@@ -359,11 +368,10 @@ accept_msg(LinkRef, Msg) ->
359368
%% the chosen delivery state.
360369
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
361370
amqp10_client_types:delivery_state()) -> ok.
362-
settle_msg(#link_ref{role = receiver,
363-
session = Session}, Msg, Settlement) ->
371+
settle_msg(LinkRef, Msg, Settlement) ->
364372
DeliveryId = amqp10_msg:delivery_id(Msg),
365-
amqp10_client_session:disposition(Session, receiver, DeliveryId,
366-
DeliveryId, true, Settlement).
373+
amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement).
374+
367375
%% @doc Get a single message from a link.
368376
%% Flows a single link credit then awaits delivery or timeout.
369377
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.

deps/amqp10_client/src/amqp10_client.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@
2020

2121
-record(link_ref, {role :: sender | receiver,
2222
session :: pid(),
23+
%% locally chosen output handle
2324
link_handle :: non_neg_integer()}).

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 62 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
detach/2,
2222
transfer/3,
2323
flow/4,
24-
disposition/6
24+
disposition/5
2525
]).
2626

2727
%% Private API
@@ -131,8 +131,9 @@
131131
available = 0 :: non_neg_integer(),
132132
drain = false :: boolean(),
133133
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
134-
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}
135-
}).
134+
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
135+
incoming_unsettled = #{} :: #{delivery_number() => ok}
136+
}).
136137

137138
-record(state,
138139
{channel :: pos_integer(),
@@ -155,7 +156,6 @@
155156
connection_config :: amqp10_client_connection:connection_config(),
156157
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID :: delivery_number(),
157158
outgoing_unsettled = #{} :: #{delivery_number() => {amqp10_msg:delivery_tag(), Notify :: pid()}},
158-
incoming_unsettled = #{} :: #{delivery_number() => output_handle()},
159159
notify :: pid()
160160
}).
161161

@@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) ->
204204
flow(Session, Handle, Flow, RenewWhenBelow) ->
205205
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
206206

207-
-spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(),
207+
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
208+
%% is currently unsupported.
209+
-spec disposition(link_ref(), delivery_number(), delivery_number(), boolean(),
208210
amqp10_client_types:delivery_state()) -> ok.
209-
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
210-
gen_statem:call(Session, {disposition, Role, First, Last, Settled,
211+
disposition(#link_ref{role = receiver,
212+
session = Session,
213+
link_handle = Handle},
214+
First, Last, Settled, DeliveryState) ->
215+
gen_statem:call(Session, {disposition, Handle, First, Last, Settled,
211216
DeliveryState}, ?TIMEOUT).
212217

213218

214-
215219
%% -------------------------------------------------------------------
216220
%% Private API.
217221
%% -------------------------------------------------------------------
@@ -277,7 +281,7 @@ mapped(cast, 'end', State) ->
277281
send_end(State),
278282
{next_state, end_sent, State};
279283
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
280-
State = send_flow_link(fun send/2, OutHandle, Flow0, RenewWhenBelow, State0),
284+
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
281285
{keep_state, State};
282286
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
283287
#state{next_incoming_id = NII,
@@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
367371
State = book_partial_transfer_received(
368372
State0#state{links = Links#{OutHandle => Link1}}),
369373
{keep_state, State};
370-
mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
371-
delivery_id = MaybeDeliveryId,
372-
settled = Settled} = Transfer0, Payload0},
373-
#state{incoming_unsettled = Unsettled0} = State0) ->
374-
374+
mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
375+
Payload0}, State0) ->
375376
{ok, #link{target = {pid, TargetPid},
376-
output_handle = OutHandle,
377-
ref = LinkRef} = Link0} =
378-
find_link_by_input_handle(InHandle, State0),
377+
ref = LinkRef,
378+
incoming_unsettled = Unsettled
379+
} = Link0} = find_link_by_input_handle(InHandle, State0),
379380

380-
{Transfer, Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),
381-
Msg = decode_as_msg(Transfer, Payload),
382-
383-
% stash the DeliveryId - not sure for what yet
384-
Unsettled = case MaybeDeliveryId of
385-
{uint, DeliveryId} when Settled =/= true ->
386-
Unsettled0#{DeliveryId => OutHandle};
387-
_ ->
388-
Unsettled0
389-
end,
381+
{Transfer = #'v1_0.transfer'{settled = Settled,
382+
delivery_id = {uint, DeliveryId}},
383+
Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),
390384

385+
Msg = decode_as_msg(Transfer, Payload),
386+
Link2 = case Settled of
387+
true ->
388+
Link1;
389+
_ ->
390+
%% "If not set on the first (or only) transfer for a (multi-transfer) delivery,
391+
%% then the settled flag MUST be interpreted as being false." [2.7.5]
392+
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
393+
end,
391394
% link bookkeeping
392395
% notify when credit is exhausted (link_credit = 0)
393396
% detach the Link with a transfer-limit-exceeded error code if further
394397
% transfers are received
395-
State1 = State0#state{incoming_unsettled = Unsettled},
396-
case book_transfer_received(State1, Link1) of
397-
{ok, Link2, State2} ->
398+
case book_transfer_received(State0, Link2) of
399+
{ok, Link3, State1} ->
398400
% deliver
399401
TargetPid ! {amqp10_msg, LinkRef, Msg},
400-
State = auto_flow(Link2, State2),
402+
State = auto_flow(Link3, State1),
401403
{keep_state, State};
402-
{credit_exhausted, Link2, State} ->
404+
{credit_exhausted, Link3, State} ->
403405
TargetPid ! {amqp10_msg, LinkRef, Msg},
404-
notify_credit_exhausted(Link2),
406+
notify_credit_exhausted(Link3),
405407
{keep_state, State};
406-
{transfer_limit_exceeded, Link2, State} ->
407-
logger:warning("transfer_limit_exceeded for link ~tp", [Link2]),
408-
Link = detach_with_error_cond(Link2, State,
408+
{transfer_limit_exceeded, Link3, State} ->
409+
logger:warning("transfer_limit_exceeded for link ~tp", [Link3]),
410+
Link = detach_with_error_cond(Link3, State,
409411
?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED),
410412
{keep_state, update_link(Link, State)}
411413
end;
@@ -501,12 +503,15 @@ mapped({call, From},
501503
end;
502504

503505
mapped({call, From},
504-
{disposition, Role, First, Last, Settled0, DeliveryState},
505-
#state{incoming_unsettled = Unsettled0} = State0) ->
506+
{disposition, OutputHandle, First, Last, Settled0, DeliveryState},
507+
#state{links = Links} = State0) ->
508+
#{OutputHandle := Link0 = #link{incoming_unsettled = Unsettled0}} = Links,
506509
Unsettled = serial_number:foldl(fun maps:remove/2, Unsettled0, First, Last),
507-
State = State0#state{incoming_unsettled = Unsettled},
510+
Link = Link0#link{incoming_unsettled = Unsettled},
511+
State1 = State0#state{links = Links#{OutputHandle := Link}},
512+
State = auto_flow(Link, State1),
508513
Disposition = #'v1_0.disposition'{
509-
role = translate_role(Role),
514+
role = translate_role(receiver),
510515
first = {uint, First},
511516
last = {uint, Last},
512517
settled = Settled0,
@@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket,
599604
{ok, length(Frames)}
600605
end.
601606

602-
send_flow_link(Send, OutHandle,
607+
send_flow_link(OutHandle,
603608
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
604609
#state{links = Links,
605610
next_incoming_id = NII,
@@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle,
625630
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
626631
delivery_count = maybe_uint(DeliveryCount),
627632
available = uint(Available)},
628-
ok = Send(Flow, State),
633+
ok = send(Flow, State),
629634
State#state{links = Links#{OutHandle =>
630635
Link#link{link_credit = Credit,
631636
auto_flow = AutoFlow}}}.
@@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
777782
max_message_size = MaxMessageSize},
778783
ok = Send(Attach, State),
779784

785+
LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
780786
Link = #link{name = Name,
781-
ref = make_link_ref(element(1, Role), self(), OutHandle),
787+
ref = LinkRef,
782788
output_handle = OutHandle,
783789
state = attach_sent,
784790
role = element(1, Role),
@@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
790796

791797
{State#state{links = Links#{OutHandle => Link},
792798
next_link_handle = NextLinkHandle,
793-
link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}.
799+
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.
794800

795801
-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
796802
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
@@ -908,7 +914,6 @@ translate_delivery_state({modified,
908914
translate_delivery_state(released) -> #'v1_0.released'{};
909915
translate_delivery_state(received) -> #'v1_0.received'{}.
910916

911-
translate_role(sender) -> false;
912917
translate_role(receiver) -> true.
913918

914919
maybe_notify_link_credit(#link{role = sender,
@@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID,
987992

988993
auto_flow(#link{link_credit = LC,
989994
auto_flow = {auto, RenewWhenBelow, Credit},
990-
output_handle = OutHandle}, State)
991-
when LC < RenewWhenBelow ->
992-
send_flow_link(fun send/2, OutHandle,
995+
output_handle = OutHandle,
996+
incoming_unsettled = Unsettled},
997+
State)
998+
when LC + map_size(Unsettled) < RenewWhenBelow ->
999+
send_flow_link(OutHandle,
9931000
#'v1_0.flow'{link_credit = {uint, Credit}},
9941001
RenewWhenBelow, State);
9951002
auto_flow(_, State) ->
@@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) ->
10451052
socket_send0({ssl, Socket}, Data) ->
10461053
ssl:send(Socket, Data).
10471054

1048-
-spec make_link_ref(_, _, _) -> link_ref().
1055+
-spec make_link_ref(link_role(), pid(), output_handle()) ->
1056+
link_ref().
10491057
make_link_ref(Role, Session, Handle) ->
10501058
#link_ref{role = Role, session = Session, link_handle = Handle}.
10511059

@@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) ->
11001108
connection_config = ConnectionConfig,
11011109
outgoing_delivery_id = OutgoingDeliveryId,
11021110
outgoing_unsettled = OutgoingUnsettled,
1103-
incoming_unsettled = IncomingUnsettled,
11041111
notify = Notify
11051112
} = Data0,
11061113
Links = maps:map(
@@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) ->
11191126
available = Available,
11201127
drain = Drain,
11211128
partial_transfers = PartialTransfers0,
1122-
auto_flow = AutoFlow
1129+
auto_flow = AutoFlow,
1130+
incoming_unsettled = IncomingUnsettled
11231131
}) ->
11241132
PartialTransfers = case PartialTransfers0 of
11251133
undefined ->
@@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) ->
11411149
available => Available,
11421150
drain => Drain,
11431151
partial_transfers => PartialTransfers,
1144-
auto_flow => AutoFlow}
1152+
auto_flow => AutoFlow,
1153+
incoming_unsettled => maps:size(IncomingUnsettled)
1154+
}
11451155
end, Links0),
11461156
Data = #{channel => Channel,
11471157
remote_channel => RemoteChannel,
@@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) ->
11601170
connection_config => maps:remove(sasl, ConnectionConfig),
11611171
outgoing_delivery_id => OutgoingDeliveryId,
11621172
outgoing_unsettled => maps:size(OutgoingUnsettled),
1163-
incoming_unsettled => maps:size(IncomingUnsettled),
11641173
notify => Notify},
11651174
Status#{data := Data}.
11661175

0 commit comments

Comments
 (0)