Skip to content

Commit f54fa7f

Browse files
committed
Maintain correct order of outgoing TRANSFER and FLOW
1 parent 2d205a6 commit f54fa7f

File tree

5 files changed

+205
-148
lines changed

5 files changed

+205
-148
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ transfer(Session, Amqp10Msg, Timeout) ->
198198
gen_statem:call(Session, {transfer, Transfer, Records}, Timeout).
199199

200200
flow(Session, Handle, Flow, RenewAfter) ->
201-
gen_statem:cast(Session, {flow, Handle, Flow, RenewAfter}).
201+
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewAfter}).
202202

203203
-spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(),
204204
amqp10_client_types:delivery_state()) -> ok.
@@ -272,9 +272,18 @@ mapped(cast, 'end', State) ->
272272
%% We send the first end frame and wait for the reply.
273273
send_end(State),
274274
{next_state, end_sent, State};
275-
mapped(cast, {flow, OutHandle, Flow0, RenewAfter}, State0) ->
276-
State = send_flow(fun send/2, OutHandle, Flow0, RenewAfter, State0),
277-
{next_state, mapped, State};
275+
mapped(cast, {flow_link, OutHandle, Flow0, RenewAfter}, State0) ->
276+
State = send_flow_link(fun send/2, OutHandle, Flow0, RenewAfter, State0),
277+
{keep_state, State};
278+
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
279+
#state{next_incoming_id = NII,
280+
next_outgoing_id = NOI} = State) ->
281+
Flow = Flow0#'v1_0.flow'{
282+
next_incoming_id = maybe_uint(NII),
283+
next_outgoing_id = uint(NOI),
284+
outgoing_window = ?UINT_OUTGOING_WINDOW},
285+
ok = send(Flow, State),
286+
{keep_state, State#state{incoming_window = IncomingWindow}};
278287
mapped(cast, #'v1_0.end'{error = Err}, State) ->
279288
%% We receive the first end frame, reply and terminate.
280289
_ = send_end(State),
@@ -313,21 +322,21 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
313322
State = State0#state{links = Links#{OutHandle => Link},
314323
link_index = maps:remove(Name, LinkIndex),
315324
link_handle_index = LHI#{InHandle => OutHandle}},
316-
{next_state, mapped, State};
325+
{keep_state, State};
317326
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
318327
error = Err},
319328
#state{links = Links, link_handle_index = LHI} = State0) ->
320329
with_link(InHandle, State0,
321330
fun (#link{output_handle = OutHandle} = Link, State) ->
322331
Reason = reason(Err),
323332
ok = notify_link_detached(Link, Reason),
324-
{next_state, mapped,
333+
{keep_state,
325334
State#state{links = maps:remove(OutHandle, Links),
326335
link_handle_index = maps:remove(InHandle, LHI)}}
327336
end);
328337
mapped(cast, #'v1_0.flow'{handle = undefined} = Flow, State0) ->
329338
State = handle_session_flow(Flow, State0),
330-
{next_state, mapped, State};
339+
{keep_state, State};
331340
mapped(cast, #'v1_0.flow'{handle = {uint, InHandle}} = Flow,
332341
#state{links = Links} = State0) ->
333342

@@ -341,7 +350,7 @@ mapped(cast, #'v1_0.flow'{handle = {uint, InHandle}} = Flow,
341350
ok = maybe_notify_link_credit(Link0, Link),
342351
Links1 = Links#{OutHandle => Link},
343352
State1 = State#state{links = Links1},
344-
{next_state, mapped, State1};
353+
{keep_state, State1};
345354
mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
346355
more = true} = Transfer, Payload},
347356
#state{links = Links} = State0) ->
@@ -353,7 +362,7 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
353362

354363
State = book_partial_transfer_received(
355364
State0#state{links = Links#{OutHandle => Link1}}),
356-
{next_state, mapped, State};
365+
{keep_state, State};
357366
mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
358367
delivery_id = MaybeDeliveryId,
359368
settled = Settled} = Transfer0, Payload0},
@@ -385,16 +394,16 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
385394
% deliver
386395
TargetPid ! {amqp10_msg, LinkRef, Msg},
387396
State1 = auto_flow(Link2, State),
388-
{next_state, mapped, State1};
397+
{keep_state, State1};
389398
{credit_exhausted, Link2, State} ->
390399
TargetPid ! {amqp10_msg, LinkRef, Msg},
391400
notify_credit_exhausted(Link2),
392-
{next_state, mapped, State};
401+
{keep_state, State};
393402
{transfer_limit_exceeded, Link2, State} ->
394403
logger:warning("transfer_limit_exceeded for link ~tp", [Link2]),
395404
Link = detach_with_error_cond(Link2, State,
396405
?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED),
397-
{next_state, mapped, update_link(Link, State)}
406+
{keep_state, update_link(Link, State)}
398407
end;
399408

400409

@@ -427,11 +436,11 @@ mapped(cast, #'v1_0.disposition'{role = true,
427436
end
428437
end, Unsettled0, First, Last),
429438

430-
{next_state, mapped, State#state{outgoing_unsettled = Unsettled}};
439+
{keep_state, State#state{outgoing_unsettled = Unsettled}};
431440
mapped(cast, Frame, State) ->
432441
logger:warning("Unhandled session frame ~tp in state ~tp",
433442
[Frame, State]),
434-
{next_state, mapped, State};
443+
{keep_state, State};
435444
mapped({call, From},
436445
{transfer, _Transfer, _Parts},
437446
#state{remote_incoming_window = Window})
@@ -518,9 +527,9 @@ end_sent(_EvtType, #'v1_0.end'{error = Err}, State) ->
518527
Reason = reason(Err),
519528
ok = notify_session_ended(State, Reason),
520529
{stop, normal, State};
521-
end_sent(_EvtType, _Frame, State) ->
530+
end_sent(_EvtType, _Frame, _State) ->
522531
% just drop frames here
523-
{next_state, end_sent, State}.
532+
keep_state_and_data.
524533

525534
terminate(Reason, _StateName, #state{channel = Channel,
526535
remote_channel = RemoteChannel,
@@ -586,12 +595,12 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket,
586595
{ok, length(Frames)}
587596
end.
588597

589-
send_flow(Send, OutHandle,
590-
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewAfter,
591-
#state{links = Links,
592-
next_incoming_id = NII,
593-
next_outgoing_id = NOI,
594-
incoming_window = InWin} = State) ->
598+
send_flow_link(Send, OutHandle,
599+
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewAfter,
600+
#state{links = Links,
601+
next_incoming_id = NII,
602+
next_outgoing_id = NOI,
603+
incoming_window = InWin} = State) ->
595604
AutoFlow = case RenewAfter of
596605
never -> never;
597606
Limit -> {auto, Limit, Credit}
@@ -602,7 +611,6 @@ send_flow(Send, OutHandle,
602611
available = Available} = Link} = Links,
603612
Flow = Flow0#'v1_0.flow'{
604613
handle = uint(H),
605-
link_credit = uint(Credit),
606614
%% "This value MUST be set if the peer has received the begin
607615
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
608616
next_incoming_id = maybe_uint(NII),
@@ -978,9 +986,9 @@ auto_flow(#link{link_credit = LC,
978986
auto_flow = {auto, Limit, Credit},
979987
output_handle = OutHandle}, State)
980988
when LC < Limit ->
981-
send_flow(fun send/2, OutHandle,
982-
#'v1_0.flow'{link_credit = {uint, Credit}},
983-
Limit, State);
989+
send_flow_link(fun send/2, OutHandle,
990+
#'v1_0.flow'{link_credit = {uint, Credit}},
991+
Limit, State);
984992
auto_flow(_, State) ->
985993
State.
986994

deps/rabbitmq_amqp1_0/include/rabbit_amqp1_0.hrl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
%%-define(debug, true).
22

33
-ifdef(debug).
4-
-define(DEBUG0(F), ?SAFE(io:format(F, []))).
5-
-define(DEBUG(F, A), ?SAFE(io:format(F, A))).
4+
-define(DEBUG0(F), ?SAFE(rabbit_log:debug(F, []))).
5+
-define(DEBUG(F, A), ?SAFE(rabbit_log:debug(F, A))).
66
-else.
77
-define(DEBUG0(F), ok).
88
-define(DEBUG(F, A), ok).
99
-endif.
1010

11-
-define(pprint(F), io:format("~p~n", [amqp10_framing:pprint(F)])).
11+
-define(pprint(F), rabbit_log:debug("~p~n",
12+
[amqp10_framing:pprint(F)])).
1213

1314
-define(SAFE(F),
1415
((fun() ->
1516
try F
1617
catch __T:__E:__ST ->
17-
io:format("~p:~p thrown debugging~n~p~n",
18-
[__T, __E, __ST])
18+
rabbit_log:debug("~p:~p thrown debugging~n~p~n",
19+
[__T, __E, __ST])
1920
end
2021
end)())).
2122

0 commit comments

Comments
 (0)