Skip to content

Make AMQP flow control configurable #11861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 82 additions & 55 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,47 +30,49 @@
}}
}).

-define(PROTOCOL, amqp10).
-define(HIBERNATE_AFTER, 6_000).
-define(CREDIT_REPLY_TIMEOUT, 30_000).
%% This is the link credit that we grant to sending clients.
%% We are free to choose whatever we want, sending clients must obey.
%% Default soft limits / credits in deps/rabbit/Makefile are:
%% 32 for quorum queues
%% 256 for streams
%% 400 for classic queues
%% If link target is a queue (rather than an exchange), we could use one of these depending
%% on target queue type. For the time being just use a static value that's something in between.
%% In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
%% target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
%% from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
-define(DEFAULT_MAX_LINK_CREDIT, 128).
%% Initial and maximum link credit that we grant to a sending queue.
%% Only when we sent sufficient messages to the writer proc, we will again grant
%% credits to the sending queue. We have this limit in place to ensure that our
%% session proc won't be flooded with messages by the sending queue, especially
%% if we are throttled sending messages to the client either by the writer proc
%% or by remote-incoming window (i.e. session flow control).
-define(DEFAULT_MAX_QUEUE_CREDIT, 256).
-define(DEFAULT_MAX_INCOMING_WINDOW, 400).
-define(MAX_LINK_CREDIT, persistent_term:get(max_link_credit)).
-define(MAX_MANAGEMENT_LINK_CREDIT, 8).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(MAX_INCOMING_WINDOW, 400).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
-define(INITIAL_OUTGOING_TRANSFER_ID, ?UINT_MAX - 3).
%% "Note that, despite its name, the delivery-count is not a count but a
%% sequence number initialized at an arbitrary point by the sender." [2.6.7]
-define(INITIAL_DELIVERY_COUNT, ?UINT_MAX - 4).
-define(INITIAL_OUTGOING_DELIVERY_ID, 0).
-define(DEFAULT_MAX_HANDLE, ?UINT_MAX).
-define(UINT(N), {uint, N}).
%% [3.4]
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(PROCESS_GROUP_NAME, amqp_sessions).
-define(UINT(N), {uint, N}).
%% This is the link credit that we grant to sending clients.
%% We are free to choose whatever we want, sending clients must obey.
%% Default soft limits / credits in deps/rabbit/Makefile are:
%% 32 for quorum queues
%% 256 for streams
%% 400 for classic queues
%% If link target is a queue (rather than an exchange), we could use one of these depending
%% on target queue type. For the time being just use a static value that's something in between.
%% In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
%% target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
%% from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
-define(LINK_CREDIT_RCV, 128).
-define(MANAGEMENT_LINK_CREDIT_RCV, 8).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
-define(DEFAULT_EXCHANGE_NAME, <<>>).
%% This is the maximum credit we grant to a sending queue.
%% Only when we sent sufficient messages to the writer proc, we will again grant credits
%% to the sending queue. We have this limit in place to ensure that our session proc won't be flooded
%% with messages by the sending queue, especially if we are throttled sending messages to the client
%% either by the writer proc or by remote-incoming window (i.e. session flow control).
-define(LINK_CREDIT_RCV_FROM_QUEUE_MAX, 256).
-define(PROTOCOL, amqp10).
-define(PROCESS_GROUP_NAME, amqp_sessions).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(HIBERNATE_AFTER, 6_000).
-define(CREDIT_REPLY_TIMEOUT, 30_000).

-export([start_link/8,
process_frame/2,
Expand Down Expand Up @@ -172,9 +174,9 @@
-record(queue_flow_ctl, {
delivery_count :: sequence_no(),
%% We cap the actual credit we grant to the sending queue.
%% If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
%% If client_flow_ctl.credit is larger than max_queue_credit,
%% we will top up in batches to the sending queue.
credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX,
credit :: rabbit_queue_type:credit(),
drain :: boolean()
}).

Expand Down Expand Up @@ -251,7 +253,8 @@
incoming_window_margin = 0 :: non_neg_integer(),
resource_alarms :: sets:set(rabbit_alarm:resource_alarm_source()),
trace_state :: rabbit_trace:state(),
conn_name :: binary()
conn_name :: binary(),
max_incoming_window :: pos_integer()
}).

-record(state, {
Expand Down Expand Up @@ -375,11 +378,22 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Alarms = sets:from_list(Alarms0, [{version, 2}]),

NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID,
MaxLinkCredit = application:get_env(
rabbit, max_link_credit, ?DEFAULT_MAX_LINK_CREDIT),
MaxQueueCredit = application:get_env(
rabbit, max_queue_credit, ?DEFAULT_MAX_QUEUE_CREDIT),
MaxIncomingWindow = application:get_env(
rabbit, max_incoming_window, ?DEFAULT_MAX_INCOMING_WINDOW),
true = is_valid_max(MaxLinkCredit),
true = is_valid_max(MaxQueueCredit),
true = is_valid_max(MaxIncomingWindow),
ok = persistent_term:put(max_link_credit, MaxLinkCredit),
ok = persistent_term:put(max_queue_credit, MaxQueueCredit),
IncomingWindow = case sets:is_empty(Alarms) of
true -> ?MAX_INCOMING_WINDOW;
true -> MaxIncomingWindow;
false -> 0
end,
NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID,

HandleMax = case HandleMax0 of
?UINT(Max) -> Max;
Expand All @@ -406,7 +420,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
channel_num = ChannelNum,
resource_alarms = Alarms,
trace_state = rabbit_trace:init(Vhost),
conn_name = ConnName
conn_name = ConnName,
max_incoming_window = MaxIncomingWindow
}}}.

terminate(_Reason, #state{incoming_links = IncomingLinks,
Expand Down Expand Up @@ -491,7 +506,9 @@ handle_cast({conserve_resources, Alarm, Conserve},
cfg = #cfg{resource_alarms = Alarms0,
incoming_window_margin = Margin0,
writer_pid = WriterPid,
channel_num = Ch} = Cfg
channel_num = Ch,
max_incoming_window = MaxIncomingWindow
} = Cfg
} = State0) ->
Alarms = case Conserve of
true -> sets:add_element(Alarm, Alarms0);
Expand All @@ -504,11 +521,11 @@ handle_cast({conserve_resources, Alarm, Conserve},
%% Notify the client to not send us any more TRANSFERs. Since we decrase
%% our incoming window dynamically, there might be incoming in-flight
%% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
{true, 0, ?MAX_INCOMING_WINDOW};
{true, 0, MaxIncomingWindow};
{false, true} ->
%% All alarms cleared.
%% Notify the client that it can resume sending us TRANSFERs.
{true, ?MAX_INCOMING_WINDOW, 0};
{true, MaxIncomingWindow, 0};
_ ->
{false, IncomingWindow0, Margin0}
end,
Expand Down Expand Up @@ -882,7 +899,7 @@ handle_control(#'v1_0.attach'{
MaxMessageSize = persistent_term:get(max_message_size),
Link = #management_link{name = LinkName,
delivery_count = DeliveryCountInt,
credit = ?MANAGEMENT_LINK_CREDIT_RCV,
credit = ?MAX_MANAGEMENT_LINK_CREDIT,
max_message_size = MaxMessageSize},
State = State0#state{management_link_pairs = Pairs,
incoming_management_links = maps:put(HandleInt, Link, Links)},
Expand All @@ -899,7 +916,7 @@ handle_control(#'v1_0.attach'{
properties = Properties},
Flow = #'v1_0.flow'{handle = Handle,
delivery_count = DeliveryCount,
link_credit = ?UINT(?MANAGEMENT_LINK_CREDIT_RCV)},
link_credit = ?UINT(?MAX_MANAGEMENT_LINK_CREDIT)},
reply0([Reply, Flow], State);

handle_control(#'v1_0.attach'{
Expand Down Expand Up @@ -978,7 +995,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
delivery_count = DeliveryCountInt,
credit = ?LINK_CREDIT_RCV},
credit = ?MAX_LINK_CREDIT},
_Outcomes = outcomes(Source),
Reply = #'v1_0.attach'{
name = LinkName,
Expand All @@ -992,7 +1009,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
max_message_size = {ulong, persistent_term:get(max_message_size)}},
Flow = #'v1_0.flow'{handle = Handle,
delivery_count = DeliveryCount,
link_credit = ?UINT(?LINK_CREDIT_RCV)},
link_credit = ?UINT(?MAX_LINK_CREDIT)},
%%TODO check that handle is not in use for any other open links.
%%"The handle MUST NOT be used for other open links. An attempt to attach
%% using a handle which is already associated with a link MUST be responded to
Expand Down Expand Up @@ -1790,7 +1807,8 @@ session_flow_control_received_transfer(
incoming_window = InWindow0,
remote_outgoing_window = RemoteOutgoingWindow,
cfg = #cfg{incoming_window_margin = Margin,
resource_alarms = Alarms}
resource_alarms = Alarms,
max_incoming_window = MaxIncomingWindow}
} = State) ->
InWindow1 = InWindow0 - 1,
case InWindow1 < -Margin of
Expand All @@ -1802,12 +1820,12 @@ session_flow_control_received_transfer(
false ->
ok
end,
{Flows, InWindow} = case InWindow1 =< (?MAX_INCOMING_WINDOW div 2) andalso
{Flows, InWindow} = case InWindow1 =< (MaxIncomingWindow div 2) andalso
sets:is_empty(Alarms) of
true ->
%% We've reached halfway and there are no
%% disk or memory alarm, open the window.
{[#'v1_0.flow'{}], ?MAX_INCOMING_WINDOW};
{[#'v1_0.flow'{}], MaxIncomingWindow};
false ->
{[], InWindow1}
end,
Expand Down Expand Up @@ -1864,11 +1882,13 @@ settle_op_from_outcome(Outcome) ->
"Unrecognised state: ~tp in DISPOSITION",
[Outcome]).

-spec flow({uint, link_handle()}, sequence_no()) -> #'v1_0.flow'{}.
-spec flow({uint, link_handle()}, sequence_no()) ->
#'v1_0.flow'{}.
flow(Handle, DeliveryCount) ->
flow(Handle, DeliveryCount, ?LINK_CREDIT_RCV).
flow(Handle, DeliveryCount, ?MAX_LINK_CREDIT).

-spec flow({uint, link_handle()}, sequence_no(), non_neg_integer()) -> #'v1_0.flow'{}.
-spec flow({uint, link_handle()}, sequence_no(), rabbit_queue_type:credit()) ->
#'v1_0.flow'{}.
flow(Handle, DeliveryCount, LinkCredit) ->
#'v1_0.flow'{handle = Handle,
delivery_count = ?UINT(DeliveryCount),
Expand Down Expand Up @@ -2394,7 +2414,7 @@ released(DeliveryId) ->
maybe_grant_link_credit(Credit, DeliveryCount, NumUnconfirmed, Handle) ->
case grant_link_credit(Credit, NumUnconfirmed) of
true ->
{?LINK_CREDIT_RCV, [flow(Handle, DeliveryCount)]};
{?MAX_LINK_CREDIT, [flow(Handle, DeliveryCount)]};
false ->
{Credit, []}
end.
Expand All @@ -2407,20 +2427,21 @@ maybe_grant_link_credit(
AccMap) ->
case grant_link_credit(Credit, map_size(U)) of
true ->
{Link#incoming_link{credit = ?LINK_CREDIT_RCV},
{Link#incoming_link{credit = ?MAX_LINK_CREDIT},
AccMap#{HandleInt => DeliveryCount}};
false ->
{Link, AccMap}
end.

grant_link_credit(Credit, NumUnconfirmed) ->
Credit =< ?LINK_CREDIT_RCV / 2 andalso
NumUnconfirmed < ?LINK_CREDIT_RCV.
MaxLinkCredit = ?MAX_LINK_CREDIT,
Credit =< MaxLinkCredit div 2 andalso
NumUnconfirmed < MaxLinkCredit.

maybe_grant_mgmt_link_credit(Credit, DeliveryCount, Handle)
when Credit =< ?MANAGEMENT_LINK_CREDIT_RCV / 2 ->
{?MANAGEMENT_LINK_CREDIT_RCV,
[flow(Handle, DeliveryCount, ?MANAGEMENT_LINK_CREDIT_RCV)]};
when Credit =< ?MAX_MANAGEMENT_LINK_CREDIT div 2 ->
{?MAX_MANAGEMENT_LINK_CREDIT,
[flow(Handle, DeliveryCount, ?MAX_MANAGEMENT_LINK_CREDIT)]};
maybe_grant_mgmt_link_credit(Credit, _, _) ->
{Credit, []}.

Expand Down Expand Up @@ -3406,10 +3427,16 @@ error_not_found(Resource) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.

is_valid_max(Val) ->
is_integer(Val) andalso
Val > 0 andalso
Val =< ?UINT_MAX.

-spec cap_credit(rabbit_queue_type:credit()) ->
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
rabbit_queue_type:credit().
cap_credit(DesiredCredit) ->
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).
MaxCredit = persistent_term:get(max_queue_credit),
min(DesiredCredit, MaxCredit).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
Expand Down
Loading