Skip to content

Commit 3c7c420

Browse files
committed
Auto widen session incoming-window in AMQP 1.0 client
This commit fixes a bug in the Erlang AMQP 1.0 client. Prior to this commit, to repro this bug: 1. Send more than 2^16 messages to a queue. 2. Grant more than a total of 2^16 link credit initially (on a single link or across multiple links) on a single session without any auto or manual link credit renewal. The expectation is that thanks to sufficiently granted initial link-credit, the client will receive all messages. However, consumption stops after exactly 2^16-1 messages. That's because the client lib was never sending a flow frame to the server. So, after the client received all 2^16-1 messages (the initial incoming-window set by the client), the server's remote-incoming-window reached 0 causing the server to stop delivering messages. The expectation is that the client lib automatically handles session flow control without any manual involvement of the client app. This commit implements this fix: * We keep the server's remote-incoming window always large by default as explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window * Hence, the client lib sets its incoming-window to 100,000 initially. * The client lib tracks its incoming-window decrementing it by 1 for every transfer it received. (This wasn't done prior to this commit.) * Whenever this window shrinks below 50,000, the client sends a flow frame without any link information widening its incoming-window back to 100,000. * For test cases (maybe later for apps as well), there is a new function `amqp10_client_session:flow/3`, which allows for a test case to do manual session flow control. Its API is designed very similar to `amqp10_client_session:flow_link/4` in that the test can optionally request the lib to auto widen the session window whenever it falls below a certain threshold. (cherry picked from commit 32854e8) (cherry picked from commit 3539462) # Conflicts: # deps/amqp10_client/src/amqp10_client_session.erl
1 parent dc060c1 commit 3c7c420

File tree

4 files changed

+227
-48
lines changed

4 files changed

+227
-48
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
335335
RenewWhenBelow =< Credit) ->
336336
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
337337
drain = Drain},
338-
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
338+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow).
339339

340340
%% @doc Stop a receiving link.
341341
%% See AMQP 1.0 spec §2.6.10.
@@ -344,7 +344,7 @@ stop_receiver_link(#link_ref{role = receiver,
344344
link_handle = Handle}) ->
345345
Flow = #'v1_0.flow'{link_credit = {uint, 0},
346346
echo = true},
347-
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
347+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, never).
348348

349349
%%% messages
350350

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 94 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
attach/2,
2121
detach/2,
2222
transfer/3,
23-
flow/4,
24-
disposition/5
23+
disposition/5,
24+
flow_link/4
2525
]).
2626

27+
%% Manual session flow control is currently only used in tests.
28+
-export([flow/3]).
29+
2730
%% Private API
2831
-export([start_link/4,
2932
socket_ready/2
@@ -51,7 +54,8 @@
5154
[add/2,
5255
diff/2]).
5356

54-
-define(MAX_SESSION_WINDOW_SIZE, 65535).
57+
%% By default, we want to keep the server's remote-incoming-window large at all times.
58+
-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000).
5559
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
5660
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
5761
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
@@ -129,7 +133,8 @@
129133
available = 0 :: non_neg_integer(),
130134
drain = false :: boolean(),
131135
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
132-
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
136+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
137+
Credit :: pos_integer()},
133138
incoming_unsettled = #{} :: #{delivery_number() => ok},
134139
footer_opt :: footer_opt() | undefined
135140
}).
@@ -140,7 +145,10 @@
140145

141146
%% session flow control, see section 2.5.6
142147
next_incoming_id :: transfer_number() | undefined,
143-
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
148+
%% Can become negative if the peer overshoots our window.
149+
incoming_window :: integer(),
150+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
151+
NewWindowSize :: pos_integer()},
144152
next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(),
145153
remote_incoming_window = 0 :: non_neg_integer(),
146154
remote_outgoing_window = 0 :: non_neg_integer(),
@@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
200208
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
201209
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
202210

203-
flow(Session, Handle, Flow, RenewWhenBelow) ->
211+
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
212+
flow(Session, IncomingWindow, RenewWhenBelow) when
213+
%% Check that the RenewWhenBelow value make sense.
214+
RenewWhenBelow =:= never orelse
215+
is_integer(RenewWhenBelow) andalso
216+
RenewWhenBelow > 0 andalso
217+
RenewWhenBelow =< IncomingWindow ->
218+
gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}).
219+
220+
-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok.
221+
flow_link(Session, Handle, Flow, RenewWhenBelow) ->
204222
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
205223

206224
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
@@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
239257
channel = Channel,
240258
reader = Reader,
241259
connection_config = ConnConfig,
260+
incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW,
261+
auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2,
262+
?DEFAULT_MAX_INCOMING_WINDOW},
242263
early_attach_requests = []},
243264
{ok, unmapped, State}.
244265

@@ -282,6 +303,7 @@ mapped(cast, 'end', State) ->
282303
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
283304
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
284305
{keep_state, State};
306+
<<<<<<< HEAD
285307
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
286308
#state{next_incoming_id = NII,
287309
next_outgoing_id = NOI} = State) ->
@@ -292,6 +314,18 @@ mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, Incomi
292314
ok = send(Flow, State),
293315
{keep_state, State#state{incoming_window = IncomingWindow}};
294316
mapped(cast, #'v1_0.end'{error = Err}, State) ->
317+
=======
318+
mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) ->
319+
AutoFlow = case RenewWhenBelow of
320+
never -> never;
321+
_ -> {RenewWhenBelow, IncomingWindow}
322+
end,
323+
State = State0#state{incoming_window = IncomingWindow,
324+
auto_flow = AutoFlow},
325+
send_flow_session(State),
326+
{keep_state, State};
327+
mapped(cast, #'v1_0.end'{} = End, State) ->
328+
>>>>>>> 35394625a (Auto widen session incoming-window in AMQP 1.0 client)
295329
%% We receive the first end frame, reply and terminate.
296330
_ = send_end(State),
297331
% TODO: send notifications for links?
@@ -660,35 +694,44 @@ is_bare_message_section(_Section) ->
660694

661695
send_flow_link(OutHandle,
662696
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
663-
#state{links = Links,
664-
next_incoming_id = NII,
665-
next_outgoing_id = NOI,
666-
incoming_window = InWin} = State) ->
697+
#state{links = Links} = State) ->
667698
AutoFlow = case RenewWhenBelow of
668699
never -> never;
669-
Limit -> {auto, Limit, Credit}
700+
_ -> {RenewWhenBelow, Credit}
670701
end,
671702
#{OutHandle := #link{output_handle = H,
672703
role = receiver,
673704
delivery_count = DeliveryCount,
674705
available = Available} = Link} = Links,
675-
Flow = Flow0#'v1_0.flow'{
676-
handle = uint(H),
677-
%% "This value MUST be set if the peer has received the begin
678-
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
679-
next_incoming_id = maybe_uint(NII),
680-
next_outgoing_id = uint(NOI),
681-
outgoing_window = ?UINT_OUTGOING_WINDOW,
682-
incoming_window = uint(InWin),
683-
%% "In the event that the receiving link endpoint has not yet seen the
684-
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
685-
delivery_count = maybe_uint(DeliveryCount),
686-
available = uint(Available)},
706+
Flow1 = Flow0#'v1_0.flow'{
707+
handle = uint(H),
708+
%% "In the event that the receiving link endpoint has not yet seen the
709+
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
710+
delivery_count = maybe_uint(DeliveryCount),
711+
available = uint(Available)},
712+
Flow = set_flow_session_fields(Flow1, State),
687713
ok = send(Flow, State),
688714
State#state{links = Links#{OutHandle =>
689715
Link#link{link_credit = Credit,
690716
auto_flow = AutoFlow}}}.
691717

718+
send_flow_session(State) ->
719+
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
720+
ok = send(Flow, State).
721+
722+
set_flow_session_fields(Flow, #state{next_incoming_id = NID,
723+
incoming_window = IW,
724+
next_outgoing_id = NOI}) ->
725+
Flow#'v1_0.flow'{
726+
%% "This value MUST be set if the peer has received the begin
727+
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
728+
next_incoming_id = maybe_uint(NID),
729+
%% IncomingWindow0 can be negative when the sending server overshoots our window.
730+
%% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
731+
incoming_window = uint(max(0, IW)),
732+
next_outgoing_id = uint(NOI),
733+
outgoing_window = ?UINT_OUTGOING_WINDOW}.
734+
692735
build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
693736
when byte_size(Bin) =< MaxPayloadSize ->
694737
T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}),
@@ -1020,17 +1063,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
10201063
links = Links#{Handle => book_link_transfer_send(Link)}}.
10211064

10221065
book_partial_transfer_received(#state{next_incoming_id = NID,
1023-
remote_outgoing_window = ROW} = State) ->
1024-
State#state{next_incoming_id = add(NID, 1),
1025-
remote_outgoing_window = ROW - 1}.
1066+
incoming_window = IW,
1067+
remote_outgoing_window = ROW} = State0) ->
1068+
State = State0#state{next_incoming_id = add(NID, 1),
1069+
incoming_window = IW - 1,
1070+
remote_outgoing_window = ROW - 1},
1071+
maybe_widen_incoming_window(State).
10261072

10271073
book_transfer_received(State = #state{connection_config =
10281074
#{transfer_limit_margin := Margin}},
10291075
#link{link_credit = Margin} = Link) ->
10301076
{transfer_limit_exceeded, Link, State};
10311077
book_transfer_received(#state{next_incoming_id = NID,
1078+
incoming_window = IW,
10321079
remote_outgoing_window = ROW,
1033-
links = Links} = State,
1080+
links = Links} = State0,
10341081
#link{output_handle = OutHandle,
10351082
delivery_count = DC,
10361083
link_credit = LC,
@@ -1040,19 +1087,31 @@ book_transfer_received(#state{next_incoming_id = NID,
10401087
%% "the receiver MUST maintain a floor of zero in its
10411088
%% calculation of the value of available" [2.6.7]
10421089
available = max(0, Avail - 1)},
1043-
State1 = State#state{links = Links#{OutHandle => Link1},
1044-
next_incoming_id = add(NID, 1),
1045-
remote_outgoing_window = ROW - 1},
1090+
State1 = State0#state{links = Links#{OutHandle => Link1},
1091+
next_incoming_id = add(NID, 1),
1092+
incoming_window = IW - 1,
1093+
remote_outgoing_window = ROW - 1},
1094+
State = maybe_widen_incoming_window(State1),
10461095
case Link1 of
10471096
#link{link_credit = 0,
10481097
auto_flow = never} ->
1049-
{credit_exhausted, Link1, State1};
1098+
{credit_exhausted, Link1, State};
10501099
_ ->
1051-
{ok, Link1, State1}
1100+
{ok, Link1, State}
10521101
end.
10531102

1103+
maybe_widen_incoming_window(
1104+
State0 = #state{incoming_window = IncomingWindow,
1105+
auto_flow = {RenewWhenBelow, NewWindowSize}})
1106+
when IncomingWindow < RenewWhenBelow ->
1107+
State = State0#state{incoming_window = NewWindowSize},
1108+
send_flow_session(State),
1109+
State;
1110+
maybe_widen_incoming_window(State) ->
1111+
State.
1112+
10541113
auto_flow(#link{link_credit = LC,
1055-
auto_flow = {auto, RenewWhenBelow, Credit},
1114+
auto_flow = {RenewWhenBelow, Credit},
10561115
output_handle = OutHandle,
10571116
incoming_unsettled = Unsettled},
10581117
State)
@@ -1218,6 +1277,7 @@ format_status(Status = #{data := Data0}) ->
12181277
remote_channel = RemoteChannel,
12191278
next_incoming_id = NextIncomingId,
12201279
incoming_window = IncomingWindow,
1280+
auto_flow = SessionAutoFlow,
12211281
next_outgoing_id = NextOutgoingId,
12221282
remote_incoming_window = RemoteIncomingWindow,
12231283
remote_outgoing_window = RemoteOutgoingWindow,
@@ -1282,6 +1342,7 @@ format_status(Status = #{data := Data0}) ->
12821342
remote_channel => RemoteChannel,
12831343
next_incoming_id => NextIncomingId,
12841344
incoming_window => IncomingWindow,
1345+
auto_flow => SessionAutoFlow,
12851346
next_outgoing_id => NextOutgoingId,
12861347
remote_incoming_window => RemoteIncomingWindow,
12871348
remote_outgoing_window => RemoteOutgoingWindow,

0 commit comments

Comments
 (0)