Skip to content

Commit 3e708bc

Browse files
committed
Avoid persistent_term for credit config
Put credit configuration into session state to make functions pure. Although these credit configurations are not meant to be dynamically changed at runtime, prior to this commit it could happen that persistent_term:get/1 returns different results across invocations leading to bugs in how credit is granted and recorded.
1 parent 9f82e46 commit 3e708bc

File tree

1 file changed

+52
-42
lines changed

1 file changed

+52
-42
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
%% or by remote-incoming window (i.e. session flow control).
5151
-define(DEFAULT_MAX_QUEUE_CREDIT, 256).
5252
-define(DEFAULT_MAX_INCOMING_WINDOW, 400).
53-
-define(MAX_LINK_CREDIT, persistent_term:get(max_link_credit)).
5453
-define(MAX_MANAGEMENT_LINK_CREDIT, 8).
5554
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
5655
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
@@ -253,7 +252,9 @@
253252
resource_alarms :: sets:set(rabbit_alarm:resource_alarm_source()),
254253
trace_state :: rabbit_trace:state(),
255254
conn_name :: binary(),
256-
max_incoming_window :: pos_integer()
255+
max_incoming_window :: pos_integer(),
256+
max_link_credit :: pos_integer(),
257+
max_queue_credit :: pos_integer()
257258
}).
258259

259260
-record(state, {
@@ -386,8 +387,6 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
386387
true = is_valid_max(MaxLinkCredit),
387388
true = is_valid_max(MaxQueueCredit),
388389
true = is_valid_max(MaxIncomingWindow),
389-
ok = persistent_term:put(max_link_credit, MaxLinkCredit),
390-
ok = persistent_term:put(max_queue_credit, MaxQueueCredit),
391390
IncomingWindow = case sets:is_empty(Alarms) of
392391
true -> MaxIncomingWindow;
393392
false -> 0
@@ -420,7 +419,9 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
420419
resource_alarms = Alarms,
421420
trace_state = rabbit_trace:init(Vhost),
422421
conn_name = ConnName,
423-
max_incoming_window = MaxIncomingWindow
422+
max_incoming_window = MaxIncomingWindow,
423+
max_link_credit = MaxLinkCredit,
424+
max_queue_credit = MaxQueueCredit
424425
}}}.
425426

426427
terminate(_Reason, #state{incoming_links = IncomingLinks,
@@ -582,7 +583,8 @@ send_delivery_state_changes(#state{stashed_rejected = [],
582583
stashed_eol = []} = State) ->
583584
State;
584585
send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
585-
channel_num = ChannelNum}}) ->
586+
channel_num = ChannelNum,
587+
max_link_credit = MaxLinkCredit}}) ->
586588
%% Order is important:
587589
%% 1. Process queue rejections.
588590
{RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0),
@@ -603,15 +605,16 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
603605
rabbit_amqp_writer:send_command(Writer, ChannelNum, Frame)
604606
end, DetachFrames),
605607
maps:foreach(fun(HandleInt, DeliveryCount) ->
606-
F0 = flow(?UINT(HandleInt), DeliveryCount),
608+
F0 = flow(?UINT(HandleInt), DeliveryCount, MaxLinkCredit),
607609
F = session_flow_fields(F0, State),
608610
rabbit_amqp_writer:send_command(Writer, ChannelNum, F)
609611
end, GrantCredits),
610612
State.
611613

612614
handle_stashed_rejected(#state{stashed_rejected = []} = State) ->
613615
{[], #{}, State};
614-
handle_stashed_rejected(#state{stashed_rejected = Actions,
616+
handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
617+
stashed_rejected = Actions,
615618
incoming_links = Links} = State0) ->
616619
{Ids, GrantCredits, Ls} =
617620
lists:foldl(
@@ -628,7 +631,8 @@ handle_stashed_rejected(#state{stashed_rejected = Actions,
628631
end,
629632
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
630633
{Link, GrantCreds} = maybe_grant_link_credit(
631-
HandleInt, Link1, GrantCreds0),
634+
MaxLinkCredit, HandleInt,
635+
Link1, GrantCreds0),
632636
{Ids1, GrantCreds, maps:update(HandleInt, Link, Links0)};
633637
error ->
634638
Acc
@@ -645,7 +649,8 @@ handle_stashed_rejected(#state{stashed_rejected = Actions,
645649

646650
handle_stashed_settled(GrantCredits, #state{stashed_settled = []} = State) ->
647651
{[], GrantCredits, State};
648-
handle_stashed_settled(GrantCredits0, #state{stashed_settled = Actions,
652+
handle_stashed_settled(GrantCredits0, #state{cfg = #cfg{max_link_credit = MaxLinkCredit},
653+
stashed_settled = Actions,
649654
incoming_links = Links} = State0) ->
650655
{Ids, GrantCredits, Ls} =
651656
lists:foldl(
@@ -674,7 +679,8 @@ handle_stashed_settled(GrantCredits0, #state{stashed_settled = Actions,
674679
end,
675680
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
676681
{Link, GrantCreds} = maybe_grant_link_credit(
677-
HandleInt, Link1, GrantCreds0),
682+
MaxLinkCredit, HandleInt,
683+
Link1, GrantCreds0),
678684
{Ids2, GrantCreds, maps:update(HandleInt, Link, Links0)};
679685
_ ->
680686
Acc
@@ -714,11 +720,14 @@ handle_stashed_down(#state{stashed_down = QNames,
714720

715721
handle_stashed_eol(DetachFrames, GrantCredits, #state{stashed_eol = []} = State) ->
716722
{[], [], DetachFrames, GrantCredits, State};
717-
handle_stashed_eol(DetachFrames0, GrantCredits0, #state{stashed_eol = Eols} = State0) ->
723+
handle_stashed_eol(DetachFrames0, GrantCredits0, #state{cfg = #cfg{max_link_credit = MaxLinkCredit},
724+
stashed_eol = Eols} = State0) ->
718725
{ReleasedIs, AcceptedIds, DetachFrames, GrantCredits, State1} =
719726
lists:foldl(fun(QName, {RIds0, AIds0, DetachFrames1, GrantCreds0, S0 = #state{incoming_links = Links0,
720727
queue_states = QStates0}}) ->
721-
{RIds, AIds, GrantCreds1, Links} = settle_eol(QName, {RIds0, AIds0, GrantCreds0, Links0}),
728+
{RIds, AIds, GrantCreds1, Links} = settle_eol(
729+
QName, MaxLinkCredit,
730+
{RIds0, AIds0, GrantCreds0, Links0}),
722731
QStates = rabbit_queue_type:remove(QName, QStates0),
723732
S1 = S0#state{incoming_links = Links,
724733
queue_states = QStates},
@@ -729,14 +738,14 @@ handle_stashed_eol(DetachFrames0, GrantCredits0, #state{stashed_eol = Eols} = St
729738
State = State1#state{stashed_eol = []},
730739
{ReleasedIs, AcceptedIds, DetachFrames, GrantCredits, State}.
731740

732-
settle_eol(QName, {_ReleasedIds, _AcceptedIds, _GrantCredits, Links} = Acc) ->
741+
settle_eol(QName, MaxLinkCredit, {_ReleasedIds, _AcceptedIds, _GrantCredits, Links} = Acc) ->
733742
maps:fold(fun(HandleInt,
734743
#incoming_link{incoming_unconfirmed_map = U0} = Link0,
735744
{RelIds0, AcceptIds0, GrantCreds0, Links0}) ->
736745
{RelIds, AcceptIds, U} = settle_eol0(QName, {RelIds0, AcceptIds0, U0}),
737746
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
738747
{Link, GrantCreds} = maybe_grant_link_credit(
739-
HandleInt, Link1, GrantCreds0),
748+
MaxLinkCredit, HandleInt, Link1, GrantCreds0),
740749
Links1 = maps:update(HandleInt,
741750
Link,
742751
Links0),
@@ -984,7 +993,8 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
984993
} = Attach,
985994
State0 = #state{incoming_links = IncomingLinks0,
986995
permission_cache = PermCache0,
987-
cfg = #cfg{vhost = Vhost,
996+
cfg = #cfg{max_link_credit = MaxLinkCredit,
997+
vhost = Vhost,
988998
user = User}}) ->
989999
ok = validate_attach(Attach),
9901000
case ensure_target(Target, Vhost, User, PermCache0) of
@@ -994,7 +1004,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
9941004
routing_key = RoutingKey,
9951005
queue_name_bin = QNameBin,
9961006
delivery_count = DeliveryCountInt,
997-
credit = ?MAX_LINK_CREDIT},
1007+
credit = MaxLinkCredit},
9981008
_Outcomes = outcomes(Source),
9991009
Reply = #'v1_0.attach'{
10001010
name = LinkName,
@@ -1008,7 +1018,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
10081018
max_message_size = {ulong, persistent_term:get(max_message_size)}},
10091019
Flow = #'v1_0.flow'{handle = Handle,
10101020
delivery_count = DeliveryCount,
1011-
link_credit = ?UINT(?MAX_LINK_CREDIT)},
1021+
link_credit = ?UINT(MaxLinkCredit)},
10121022
%%TODO check that handle is not in use for any other open links.
10131023
%%"The handle MUST NOT be used for other open links. An attempt to attach
10141024
%% using a handle which is already associated with a link MUST be responded to
@@ -1458,7 +1468,7 @@ handle_credit_reply0(
14581468
CCredit > 0 ->
14591469
QName = Link0#outgoing_link.queue_name,
14601470
%% Provide queue next batch of credits.
1461-
CappedCredit = cap_credit(CCredit),
1471+
CappedCredit = cap_credit(CCredit, S0#state.cfg#cfg.max_queue_credit),
14621472
{ok, QStates, Actions} =
14631473
rabbit_queue_type:credit(
14641474
QName, Ctag, DeliveryCount, CappedCredit, false, QStates0),
@@ -1496,7 +1506,8 @@ handle_credit_reply0(
14961506
} = QFC,
14971507
stashed_credit_req = StashedCreditReq},
14981508
S0 = #state{cfg = #cfg{writer_pid = Writer,
1499-
channel_num = ChanNum},
1509+
channel_num = ChanNum,
1510+
max_queue_credit = MaxQueueCredit},
15001511
outgoing_links = OutgoingLinks,
15011512
queue_states = QStates0}) ->
15021513
%% If the queue sent us a drain credit_reply,
@@ -1512,7 +1523,7 @@ handle_credit_reply0(
15121523
%% the current drain credit top-up rounds over a stashed credit request because
15131524
%% this is easier to reason about and the queue will reply promptly meaning
15141525
%% the stashed request will be processed soon enough.
1515-
CappedCredit = cap_credit(CCredit),
1526+
CappedCredit = cap_credit(CCredit, MaxQueueCredit),
15161527
{ok, QStates, Actions} = rabbit_queue_type:credit(
15171528
QName, Ctag, DeliveryCount,
15181529
CappedCredit, true, QStates0),
@@ -1578,11 +1589,12 @@ pop_credit_req(
15781589
drain = Drain,
15791590
echo = Echo
15801591
}},
1581-
S0 = #state{outgoing_links = OutgoingLinks,
1592+
S0 = #state{cfg = #cfg{max_queue_credit = MaxQueueCredit},
1593+
outgoing_links = OutgoingLinks,
15821594
queue_states = QStates0}) ->
15831595
LinkCreditSnd = amqp10_util:link_credit_snd(
15841596
DeliveryCountRcv, LinkCreditRcv, CDeliveryCount),
1585-
CappedCredit = cap_credit(LinkCreditSnd),
1597+
CappedCredit = cap_credit(LinkCreditSnd, MaxQueueCredit),
15861598
{ok, QStates, Actions} = rabbit_queue_type:credit(
15871599
QName, Ctag, QDeliveryCount,
15881600
CappedCredit, Drain, QStates0),
@@ -1753,7 +1765,8 @@ sent_pending_delivery(
17531765
%% assertion
17541766
none = Link0#outgoing_link.stashed_credit_req,
17551767
%% Provide queue next batch of credits.
1756-
CappedCredit = cap_credit(CCredit),
1768+
CappedCredit = cap_credit(CCredit,
1769+
S0#state.cfg#cfg.max_queue_credit),
17571770
{ok, QStates1, Actions0} =
17581771
rabbit_queue_type:credit(
17591772
QName, Ctag, QDeliveryCount, CappedCredit,
@@ -1891,11 +1904,6 @@ settle_op_from_outcome(Outcome) ->
18911904
"Unrecognised state: ~tp in DISPOSITION",
18921905
[Outcome]).
18931906

1894-
-spec flow({uint, link_handle()}, sequence_no()) ->
1895-
#'v1_0.flow'{}.
1896-
flow(Handle, DeliveryCount) ->
1897-
flow(Handle, DeliveryCount, ?MAX_LINK_CREDIT).
1898-
18991907
-spec flow({uint, link_handle()}, sequence_no(), rabbit_queue_type:credit()) ->
19001908
#'v1_0.flow'{}.
19011909
flow(Handle, DeliveryCount, LinkCredit) ->
@@ -2281,7 +2289,8 @@ incoming_link_transfer(
22812289
vhost = Vhost,
22822290
trace_state = Trace,
22832291
conn_name = ConnName,
2284-
channel_num = ChannelNum}}) ->
2292+
channel_num = ChannelNum,
2293+
max_link_credit = MaxLinkCredit}}) ->
22852294

22862295
{PayloadBin, DeliveryId, Settled} =
22872296
case MultiTransfer of
@@ -2326,7 +2335,8 @@ incoming_link_transfer(
23262335
DeliveryCount = add(DeliveryCount0, 1),
23272336
Credit1 = Credit0 - 1,
23282337
{Credit, Reply1} = maybe_grant_link_credit(
2329-
Credit1, DeliveryCount, map_size(U), Handle),
2338+
Credit1, MaxLinkCredit,
2339+
DeliveryCount, map_size(U), Handle),
23302340
Reply = Reply0 ++ Reply1,
23312341
Link = Link0#incoming_link{
23322342
delivery_count = DeliveryCount,
@@ -2420,30 +2430,30 @@ released(DeliveryId) ->
24202430
settled = true,
24212431
state = #'v1_0.released'{}}.
24222432

2423-
maybe_grant_link_credit(Credit, DeliveryCount, NumUnconfirmed, Handle) ->
2424-
case grant_link_credit(Credit, NumUnconfirmed) of
2433+
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
2434+
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
24252435
true ->
2426-
{?MAX_LINK_CREDIT, [flow(Handle, DeliveryCount)]};
2436+
{MaxLinkCredit, [flow(Handle, DeliveryCount, MaxLinkCredit)]};
24272437
false ->
24282438
{Credit, []}
24292439
end.
24302440

24312441
maybe_grant_link_credit(
2442+
MaxLinkCredit,
24322443
HandleInt,
24332444
Link = #incoming_link{credit = Credit,
24342445
incoming_unconfirmed_map = U,
24352446
delivery_count = DeliveryCount},
24362447
AccMap) ->
2437-
case grant_link_credit(Credit, map_size(U)) of
2448+
case grant_link_credit(Credit, MaxLinkCredit, map_size(U)) of
24382449
true ->
2439-
{Link#incoming_link{credit = ?MAX_LINK_CREDIT},
2450+
{Link#incoming_link{credit = MaxLinkCredit},
24402451
AccMap#{HandleInt => DeliveryCount}};
24412452
false ->
24422453
{Link, AccMap}
24432454
end.
24442455

2445-
grant_link_credit(Credit, NumUnconfirmed) ->
2446-
MaxLinkCredit = ?MAX_LINK_CREDIT,
2456+
grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) ->
24472457
Credit =< MaxLinkCredit div 2 andalso
24482458
NumUnconfirmed < MaxLinkCredit.
24492459

@@ -2739,7 +2749,8 @@ handle_outgoing_link_flow_control(
27392749
DeliveryCountRcv,
27402750
LinkCreditRcv,
27412751
CFC#client_flow_ctl.delivery_count),
2742-
CappedCredit = cap_credit(LinkCreditSnd),
2752+
CappedCredit = cap_credit(LinkCreditSnd,
2753+
State0#state.cfg#cfg.max_queue_credit),
27432754
Link = Link0#outgoing_link{
27442755
client_flow_ctl = CFC#client_flow_ctl{
27452756
credit = LinkCreditSnd,
@@ -3444,10 +3455,9 @@ is_valid_max(Val) ->
34443455
pg_scope() ->
34453456
rabbit:pg_local_scope(amqp_session).
34463457

3447-
-spec cap_credit(rabbit_queue_type:credit()) ->
3458+
-spec cap_credit(rabbit_queue_type:credit(), pos_integer()) ->
34483459
rabbit_queue_type:credit().
3449-
cap_credit(DesiredCredit) ->
3450-
MaxCredit = persistent_term:get(max_queue_credit),
3460+
cap_credit(DesiredCredit, MaxCredit) ->
34513461
min(DesiredCredit, MaxCredit).
34523462

34533463
ensure_mc_cluster_compat(Mc) ->

0 commit comments

Comments
 (0)