Skip to content

Commit 3539462

Browse files
ansdmergify[bot]
authored andcommitted
Auto widen session incoming-window in AMQP 1.0 client
This commit fixes a bug in the Erlang AMQP 1.0 client. Prior to this commit, to repro this bug: 1. Send more than 2^16 messages to a queue. 2. Grant more than a total of 2^16 link credit initially (on a single link or across multiple links) on a single session without any auto or manual link credit renewal. The expectation is that thanks to sufficiently granted initial link-credit, the client will receive all messages. However, consumption stops after exactly 2^16-1 messages. That's because the client lib was never sending a flow frame to the server. So, after the client received all 2^16-1 messages (the initial incoming-window set by the client), the server's remote-incoming-window reached 0 causing the server to stop delivering messages. The expectation is that the client lib automatically handles session flow control without any manual involvement of the client app. This commit implements this fix: * We keep the server's remote-incoming window always large by default as explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window * Hence, the client lib sets its incoming-window to 100,000 initially. * The client lib tracks its incoming-window decrementing it by 1 for every transfer it received. (This wasn't done prior to this commit.) * Whenever this window shrinks below 50,000, the client sends a flow frame without any link information widening its incoming-window back to 100,000. * For test cases (maybe later for apps as well), there is a new function `amqp10_client_session:flow/3`, which allows for a test case to do manual session flow control. Its API is designed very similar to `amqp10_client_session:flow_link/4` in that the test can optionally request the lib to auto widen the session window whenever it falls below a certain threshold. (cherry picked from commit 32854e8)
1 parent 1030139 commit 3539462

File tree

4 files changed

+223
-57
lines changed

4 files changed

+223
-57
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
339339
RenewWhenBelow =< Credit) ->
340340
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
341341
drain = Drain},
342-
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
342+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow).
343343

344344
%% @doc Stop a receiving link.
345345
%% See AMQP 1.0 spec §2.6.10.
@@ -348,7 +348,7 @@ stop_receiver_link(#link_ref{role = receiver,
348348
link_handle = Handle}) ->
349349
Flow = #'v1_0.flow'{link_credit = {uint, 0},
350350
echo = true},
351-
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
351+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, never).
352352

353353
%%% messages
354354

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
attach/2,
2121
detach/2,
2222
transfer/3,
23-
flow/4,
24-
disposition/5
23+
disposition/5,
24+
flow_link/4
2525
]).
2626

27+
%% Manual session flow control is currently only used in tests.
28+
-export([flow/3]).
29+
2730
%% Private API
2831
-export([start_link/4,
2932
socket_ready/2
@@ -51,7 +54,8 @@
5154
[add/2,
5255
diff/2]).
5356

54-
-define(MAX_SESSION_WINDOW_SIZE, 65535).
57+
%% By default, we want to keep the server's remote-incoming-window large at all times.
58+
-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000).
5559
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
5660
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
5761
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
@@ -129,7 +133,8 @@
129133
available = 0 :: non_neg_integer(),
130134
drain = false :: boolean(),
131135
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
132-
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
136+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
137+
Credit :: pos_integer()},
133138
incoming_unsettled = #{} :: #{delivery_number() => ok},
134139
footer_opt :: footer_opt() | undefined
135140
}).
@@ -140,7 +145,10 @@
140145

141146
%% session flow control, see section 2.5.6
142147
next_incoming_id :: transfer_number() | undefined,
143-
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
148+
%% Can become negative if the peer overshoots our window.
149+
incoming_window :: integer(),
150+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
151+
NewWindowSize :: pos_integer()},
144152
next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(),
145153
remote_incoming_window = 0 :: non_neg_integer(),
146154
remote_outgoing_window = 0 :: non_neg_integer(),
@@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
200208
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
201209
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
202210

203-
flow(Session, Handle, Flow, RenewWhenBelow) ->
211+
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
212+
flow(Session, IncomingWindow, RenewWhenBelow) when
213+
%% Check that the RenewWhenBelow value make sense.
214+
RenewWhenBelow =:= never orelse
215+
is_integer(RenewWhenBelow) andalso
216+
RenewWhenBelow > 0 andalso
217+
RenewWhenBelow =< IncomingWindow ->
218+
gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}).
219+
220+
-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok.
221+
flow_link(Session, Handle, Flow, RenewWhenBelow) ->
204222
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
205223

206224
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
@@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
239257
channel = Channel,
240258
reader = Reader,
241259
connection_config = ConnConfig,
260+
incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW,
261+
auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2,
262+
?DEFAULT_MAX_INCOMING_WINDOW},
242263
early_attach_requests = []},
243264
{ok, unmapped, State}.
244265

@@ -282,15 +303,15 @@ mapped(cast, 'end', State) ->
282303
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
283304
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
284305
{keep_state, State};
285-
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
286-
#state{next_incoming_id = NII,
287-
next_outgoing_id = NOI} = State) ->
288-
Flow = Flow0#'v1_0.flow'{
289-
next_incoming_id = maybe_uint(NII),
290-
next_outgoing_id = uint(NOI),
291-
outgoing_window = ?UINT_OUTGOING_WINDOW},
292-
ok = send(Flow, State),
293-
{keep_state, State#state{incoming_window = IncomingWindow}};
306+
mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) ->
307+
AutoFlow = case RenewWhenBelow of
308+
never -> never;
309+
_ -> {RenewWhenBelow, IncomingWindow}
310+
end,
311+
State = State0#state{incoming_window = IncomingWindow,
312+
auto_flow = AutoFlow},
313+
send_flow_session(State),
314+
{keep_state, State};
294315
mapped(cast, #'v1_0.end'{} = End, State) ->
295316
%% We receive the first end frame, reply and terminate.
296317
_ = send_end(State),
@@ -656,35 +677,44 @@ is_bare_message_section(_Section) ->
656677

657678
send_flow_link(OutHandle,
658679
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
659-
#state{links = Links,
660-
next_incoming_id = NII,
661-
next_outgoing_id = NOI,
662-
incoming_window = InWin} = State) ->
680+
#state{links = Links} = State) ->
663681
AutoFlow = case RenewWhenBelow of
664682
never -> never;
665-
Limit -> {auto, Limit, Credit}
683+
_ -> {RenewWhenBelow, Credit}
666684
end,
667685
#{OutHandle := #link{output_handle = H,
668686
role = receiver,
669687
delivery_count = DeliveryCount,
670688
available = Available} = Link} = Links,
671-
Flow = Flow0#'v1_0.flow'{
672-
handle = uint(H),
673-
%% "This value MUST be set if the peer has received the begin
674-
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
675-
next_incoming_id = maybe_uint(NII),
676-
next_outgoing_id = uint(NOI),
677-
outgoing_window = ?UINT_OUTGOING_WINDOW,
678-
incoming_window = uint(InWin),
679-
%% "In the event that the receiving link endpoint has not yet seen the
680-
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
681-
delivery_count = maybe_uint(DeliveryCount),
682-
available = uint(Available)},
689+
Flow1 = Flow0#'v1_0.flow'{
690+
handle = uint(H),
691+
%% "In the event that the receiving link endpoint has not yet seen the
692+
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
693+
delivery_count = maybe_uint(DeliveryCount),
694+
available = uint(Available)},
695+
Flow = set_flow_session_fields(Flow1, State),
683696
ok = send(Flow, State),
684697
State#state{links = Links#{OutHandle =>
685698
Link#link{link_credit = Credit,
686699
auto_flow = AutoFlow}}}.
687700

701+
send_flow_session(State) ->
702+
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
703+
ok = send(Flow, State).
704+
705+
set_flow_session_fields(Flow, #state{next_incoming_id = NID,
706+
incoming_window = IW,
707+
next_outgoing_id = NOI}) ->
708+
Flow#'v1_0.flow'{
709+
%% "This value MUST be set if the peer has received the begin
710+
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
711+
next_incoming_id = maybe_uint(NID),
712+
%% IncomingWindow0 can be negative when the sending server overshoots our window.
713+
%% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
714+
incoming_window = uint(max(0, IW)),
715+
next_outgoing_id = uint(NOI),
716+
outgoing_window = ?UINT_OUTGOING_WINDOW}.
717+
688718
build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
689719
when byte_size(Bin) =< MaxPayloadSize ->
690720
T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}),
@@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
10591089
links = Links#{Handle => book_link_transfer_send(Link)}}.
10601090

10611091
book_partial_transfer_received(#state{next_incoming_id = NID,
1062-
remote_outgoing_window = ROW} = State) ->
1063-
State#state{next_incoming_id = add(NID, 1),
1064-
remote_outgoing_window = ROW - 1}.
1092+
incoming_window = IW,
1093+
remote_outgoing_window = ROW} = State0) ->
1094+
State = State0#state{next_incoming_id = add(NID, 1),
1095+
incoming_window = IW - 1,
1096+
remote_outgoing_window = ROW - 1},
1097+
maybe_widen_incoming_window(State).
10651098

10661099
book_transfer_received(State = #state{connection_config =
10671100
#{transfer_limit_margin := Margin}},
10681101
#link{link_credit = Margin} = Link) ->
10691102
{transfer_limit_exceeded, Link, State};
10701103
book_transfer_received(#state{next_incoming_id = NID,
1104+
incoming_window = IW,
10711105
remote_outgoing_window = ROW,
1072-
links = Links} = State,
1106+
links = Links} = State0,
10731107
#link{output_handle = OutHandle,
10741108
delivery_count = DC,
10751109
link_credit = LC,
@@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID,
10791113
%% "the receiver MUST maintain a floor of zero in its
10801114
%% calculation of the value of available" [2.6.7]
10811115
available = max(0, Avail - 1)},
1082-
State1 = State#state{links = Links#{OutHandle => Link1},
1083-
next_incoming_id = add(NID, 1),
1084-
remote_outgoing_window = ROW - 1},
1116+
State1 = State0#state{links = Links#{OutHandle => Link1},
1117+
next_incoming_id = add(NID, 1),
1118+
incoming_window = IW - 1,
1119+
remote_outgoing_window = ROW - 1},
1120+
State = maybe_widen_incoming_window(State1),
10851121
case Link1 of
10861122
#link{link_credit = 0,
10871123
auto_flow = never} ->
1088-
{credit_exhausted, Link1, State1};
1124+
{credit_exhausted, Link1, State};
10891125
_ ->
1090-
{ok, Link1, State1}
1126+
{ok, Link1, State}
10911127
end.
10921128

1129+
maybe_widen_incoming_window(
1130+
State0 = #state{incoming_window = IncomingWindow,
1131+
auto_flow = {RenewWhenBelow, NewWindowSize}})
1132+
when IncomingWindow < RenewWhenBelow ->
1133+
State = State0#state{incoming_window = NewWindowSize},
1134+
send_flow_session(State),
1135+
State;
1136+
maybe_widen_incoming_window(State) ->
1137+
State.
1138+
10931139
auto_flow(#link{link_credit = LC,
1094-
auto_flow = {auto, RenewWhenBelow, Credit},
1140+
auto_flow = {RenewWhenBelow, Credit},
10951141
output_handle = OutHandle,
10961142
incoming_unsettled = Unsettled},
10971143
State)
@@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) ->
12301276
remote_channel = RemoteChannel,
12311277
next_incoming_id = NextIncomingId,
12321278
incoming_window = IncomingWindow,
1279+
auto_flow = SessionAutoFlow,
12331280
next_outgoing_id = NextOutgoingId,
12341281
remote_incoming_window = RemoteIncomingWindow,
12351282
remote_outgoing_window = RemoteOutgoingWindow,
@@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) ->
12941341
remote_channel => RemoteChannel,
12951342
next_incoming_id => NextIncomingId,
12961343
incoming_window => IncomingWindow,
1344+
auto_flow => SessionAutoFlow,
12971345
next_outgoing_id => NextOutgoingId,
12981346
remote_incoming_window => RemoteIncomingWindow,
12991347
remote_outgoing_window => RemoteOutgoingWindow,

0 commit comments

Comments
 (0)