30
30
}}
31
31
}).
32
32
33
- -define (PROTOCOL , amqp10 ).
34
- -define (HIBERNATE_AFTER , 6_000 ).
35
- -define (CREDIT_REPLY_TIMEOUT , 30_000 ).
33
+ % % This is the link credit that we grant to sending clients.
34
+ % % We are free to choose whatever we want, sending clients must obey.
35
+ % % Default soft limits / credits in deps/rabbit/Makefile are:
36
+ % % 32 for quorum queues
37
+ % % 256 for streams
38
+ % % 400 for classic queues
39
+ % % If link target is a queue (rather than an exchange), we could use one of these depending
40
+ % % on target queue type. For the time being just use a static value that's something in between.
41
+ % % In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
42
+ % % target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
43
+ % % from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
44
+ -define (DEFAULT_MAX_LINK_CREDIT , 128 ).
45
+ % % Initial and maximum link credit that we grant to a sending queue.
46
+ % % Only when we sent sufficient messages to the writer proc, we will again grant
47
+ % % credits to the sending queue. We have this limit in place to ensure that our
48
+ % % session proc won't be flooded with messages by the sending queue, especially
49
+ % % if we are throttled sending messages to the client either by the writer proc
50
+ % % or by remote-incoming window (i.e. session flow control).
51
+ -define (DEFAULT_MAX_QUEUE_CREDIT , 256 ).
52
+ -define (DEFAULT_MAX_INCOMING_WINDOW , 400 ).
53
+ -define (MAX_LINK_CREDIT , persistent_term :get (max_link_credit )).
54
+ -define (MAX_MANAGEMENT_LINK_CREDIT , 8 ).
55
+ -define (MANAGEMENT_NODE_ADDRESS , <<" /management" >>).
36
56
-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
37
- -define (MAX_INCOMING_WINDOW , 400 ).
38
57
% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
39
58
-define (INITIAL_OUTGOING_TRANSFER_ID , ? UINT_MAX - 3 ).
40
59
% % "Note that, despite its name, the delivery-count is not a count but a
41
60
% % sequence number initialized at an arbitrary point by the sender." [2.6.7]
42
61
-define (INITIAL_DELIVERY_COUNT , ? UINT_MAX - 4 ).
43
62
-define (INITIAL_OUTGOING_DELIVERY_ID , 0 ).
44
63
-define (DEFAULT_MAX_HANDLE , ? UINT_MAX ).
64
+ -define (UINT (N ), {uint , N }).
45
65
% % [3.4]
46
66
-define (OUTCOMES , [? V_1_0_SYMBOL_ACCEPTED ,
47
67
? V_1_0_SYMBOL_REJECTED ,
48
68
? V_1_0_SYMBOL_RELEASED ,
49
69
? V_1_0_SYMBOL_MODIFIED ]).
50
- -define (MAX_PERMISSION_CACHE_SIZE , 12 ).
51
- -define (PROCESS_GROUP_NAME , amqp_sessions ).
52
- -define (UINT (N ), {uint , N }).
53
- % % This is the link credit that we grant to sending clients.
54
- % % We are free to choose whatever we want, sending clients must obey.
55
- % % Default soft limits / credits in deps/rabbit/Makefile are:
56
- % % 32 for quorum queues
57
- % % 256 for streams
58
- % % 400 for classic queues
59
- % % If link target is a queue (rather than an exchange), we could use one of these depending
60
- % % on target queue type. For the time being just use a static value that's something in between.
61
- % % In future, we could dynamically grow (or shrink) the link credit we grant depending on how fast
62
- % % target queue(s) actually confirm messages: see paper "Credit-Based Flow Control for ATM Networks"
63
- % % from 1995, section 4.2 "Static vs. adaptive credit control" for pros and cons.
64
- -define (LINK_CREDIT_RCV , 128 ).
65
- -define (MANAGEMENT_LINK_CREDIT_RCV , 8 ).
66
- -define (MANAGEMENT_NODE_ADDRESS , <<" /management" >>).
67
70
-define (DEFAULT_EXCHANGE_NAME , <<>>).
68
- % % This is the maximum credit we grant to a sending queue.
69
- % % Only when we sent sufficient messages to the writer proc, we will again grant credits
70
- % % to the sending queue. We have this limit in place to ensure that our session proc won't be flooded
71
- % % with messages by the sending queue, especially if we are throttled sending messages to the client
72
- % % either by the writer proc or by remote-incoming window (i.e. session flow control).
73
- -define (LINK_CREDIT_RCV_FROM_QUEUE_MAX , 256 ).
71
+ -define (PROTOCOL , amqp10 ).
72
+ -define (PROCESS_GROUP_NAME , amqp_sessions ).
73
+ -define (MAX_PERMISSION_CACHE_SIZE , 12 ).
74
+ -define (HIBERNATE_AFTER , 6_000 ).
75
+ -define (CREDIT_REPLY_TIMEOUT , 30_000 ).
74
76
75
77
-export ([start_link /8 ,
76
78
process_frame /2 ,
172
174
-record (queue_flow_ctl , {
173
175
delivery_count :: sequence_no (),
174
176
% % We cap the actual credit we grant to the sending queue.
175
- % % If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
177
+ % % If client_flow_ctl.credit is larger than max_queue_credit ,
176
178
% % we will top up in batches to the sending queue.
177
- credit :: 0 .. ? LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
179
+ credit :: rabbit_queue_type : credit () ,
178
180
drain :: boolean ()
179
181
}).
180
182
251
253
incoming_window_margin = 0 :: non_neg_integer (),
252
254
resource_alarms :: sets :set (rabbit_alarm :resource_alarm_source ()),
253
255
trace_state :: rabbit_trace :state (),
254
- conn_name :: binary ()
256
+ conn_name :: binary (),
257
+ max_incoming_window :: pos_integer ()
255
258
}).
256
259
257
260
-record (state , {
@@ -375,11 +378,22 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
375
378
Alarms0 = rabbit_alarm :register (self (), {? MODULE , conserve_resources , []}),
376
379
Alarms = sets :from_list (Alarms0 , [{version , 2 }]),
377
380
378
- NextOutgoingId = ? INITIAL_OUTGOING_TRANSFER_ID ,
381
+ MaxLinkCredit = application :get_env (
382
+ rabbit , max_link_credit , ? DEFAULT_MAX_LINK_CREDIT ),
383
+ MaxQueueCredit = application :get_env (
384
+ rabbit , max_queue_credit , ? DEFAULT_MAX_QUEUE_CREDIT ),
385
+ MaxIncomingWindow = application :get_env (
386
+ rabbit , max_incoming_window , ? DEFAULT_MAX_INCOMING_WINDOW ),
387
+ true = is_valid_max (MaxLinkCredit ),
388
+ true = is_valid_max (MaxQueueCredit ),
389
+ true = is_valid_max (MaxIncomingWindow ),
390
+ ok = persistent_term :put (max_link_credit , MaxLinkCredit ),
391
+ ok = persistent_term :put (max_queue_credit , MaxQueueCredit ),
379
392
IncomingWindow = case sets :is_empty (Alarms ) of
380
- true -> ? MAX_INCOMING_WINDOW ;
393
+ true -> MaxIncomingWindow ;
381
394
false -> 0
382
395
end ,
396
+ NextOutgoingId = ? INITIAL_OUTGOING_TRANSFER_ID ,
383
397
384
398
HandleMax = case HandleMax0 of
385
399
? UINT (Max ) -> Max ;
@@ -406,7 +420,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
406
420
channel_num = ChannelNum ,
407
421
resource_alarms = Alarms ,
408
422
trace_state = rabbit_trace :init (Vhost ),
409
- conn_name = ConnName
423
+ conn_name = ConnName ,
424
+ max_incoming_window = MaxIncomingWindow
410
425
}}}.
411
426
412
427
terminate (_Reason , # state {incoming_links = IncomingLinks ,
@@ -491,7 +506,9 @@ handle_cast({conserve_resources, Alarm, Conserve},
491
506
cfg = # cfg {resource_alarms = Alarms0 ,
492
507
incoming_window_margin = Margin0 ,
493
508
writer_pid = WriterPid ,
494
- channel_num = Ch } = Cfg
509
+ channel_num = Ch ,
510
+ max_incoming_window = MaxIncomingWindow
511
+ } = Cfg
495
512
} = State0 ) ->
496
513
Alarms = case Conserve of
497
514
true -> sets :add_element (Alarm , Alarms0 );
@@ -504,11 +521,11 @@ handle_cast({conserve_resources, Alarm, Conserve},
504
521
% % Notify the client to not send us any more TRANSFERs. Since we decrase
505
522
% % our incoming window dynamically, there might be incoming in-flight
506
523
% % TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
507
- {true , 0 , ? MAX_INCOMING_WINDOW };
524
+ {true , 0 , MaxIncomingWindow };
508
525
{false , true } ->
509
526
% % All alarms cleared.
510
527
% % Notify the client that it can resume sending us TRANSFERs.
511
- {true , ? MAX_INCOMING_WINDOW , 0 };
528
+ {true , MaxIncomingWindow , 0 };
512
529
_ ->
513
530
{false , IncomingWindow0 , Margin0 }
514
531
end ,
@@ -882,7 +899,7 @@ handle_control(#'v1_0.attach'{
882
899
MaxMessageSize = persistent_term :get (max_message_size ),
883
900
Link = # management_link {name = LinkName ,
884
901
delivery_count = DeliveryCountInt ,
885
- credit = ? MANAGEMENT_LINK_CREDIT_RCV ,
902
+ credit = ? MAX_MANAGEMENT_LINK_CREDIT ,
886
903
max_message_size = MaxMessageSize },
887
904
State = State0 # state {management_link_pairs = Pairs ,
888
905
incoming_management_links = maps :put (HandleInt , Link , Links )},
@@ -899,7 +916,7 @@ handle_control(#'v1_0.attach'{
899
916
properties = Properties },
900
917
Flow = # 'v1_0.flow' {handle = Handle ,
901
918
delivery_count = DeliveryCount ,
902
- link_credit = ? UINT (? MANAGEMENT_LINK_CREDIT_RCV )},
919
+ link_credit = ? UINT (? MAX_MANAGEMENT_LINK_CREDIT )},
903
920
reply0 ([Reply , Flow ], State );
904
921
905
922
handle_control (# 'v1_0.attach' {
@@ -978,7 +995,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
978
995
routing_key = RoutingKey ,
979
996
queue_name_bin = QNameBin ,
980
997
delivery_count = DeliveryCountInt ,
981
- credit = ? LINK_CREDIT_RCV },
998
+ credit = ? MAX_LINK_CREDIT },
982
999
_Outcomes = outcomes (Source ),
983
1000
Reply = # 'v1_0.attach' {
984
1001
name = LinkName ,
@@ -992,7 +1009,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
992
1009
max_message_size = {ulong , persistent_term :get (max_message_size )}},
993
1010
Flow = # 'v1_0.flow' {handle = Handle ,
994
1011
delivery_count = DeliveryCount ,
995
- link_credit = ? UINT (? LINK_CREDIT_RCV )},
1012
+ link_credit = ? UINT (? MAX_LINK_CREDIT )},
996
1013
% %TODO check that handle is not in use for any other open links.
997
1014
% %"The handle MUST NOT be used for other open links. An attempt to attach
998
1015
% % using a handle which is already associated with a link MUST be responded to
@@ -1790,7 +1807,8 @@ session_flow_control_received_transfer(
1790
1807
incoming_window = InWindow0 ,
1791
1808
remote_outgoing_window = RemoteOutgoingWindow ,
1792
1809
cfg = # cfg {incoming_window_margin = Margin ,
1793
- resource_alarms = Alarms }
1810
+ resource_alarms = Alarms ,
1811
+ max_incoming_window = MaxIncomingWindow }
1794
1812
} = State ) ->
1795
1813
InWindow1 = InWindow0 - 1 ,
1796
1814
case InWindow1 < - Margin of
@@ -1802,12 +1820,12 @@ session_flow_control_received_transfer(
1802
1820
false ->
1803
1821
ok
1804
1822
end ,
1805
- {Flows , InWindow } = case InWindow1 =< (? MAX_INCOMING_WINDOW div 2 ) andalso
1823
+ {Flows , InWindow } = case InWindow1 =< (MaxIncomingWindow div 2 ) andalso
1806
1824
sets :is_empty (Alarms ) of
1807
1825
true ->
1808
1826
% % We've reached halfway and there are no
1809
1827
% % disk or memory alarm, open the window.
1810
- {[# 'v1_0.flow' {}], ? MAX_INCOMING_WINDOW };
1828
+ {[# 'v1_0.flow' {}], MaxIncomingWindow };
1811
1829
false ->
1812
1830
{[], InWindow1 }
1813
1831
end ,
@@ -1864,11 +1882,13 @@ settle_op_from_outcome(Outcome) ->
1864
1882
" Unrecognised state: ~tp in DISPOSITION" ,
1865
1883
[Outcome ]).
1866
1884
1867
- -spec flow ({uint , link_handle ()}, sequence_no ()) -> # 'v1_0.flow' {}.
1885
+ -spec flow ({uint , link_handle ()}, sequence_no ()) ->
1886
+ # 'v1_0.flow' {}.
1868
1887
flow (Handle , DeliveryCount ) ->
1869
- flow (Handle , DeliveryCount , ? LINK_CREDIT_RCV ).
1888
+ flow (Handle , DeliveryCount , ? MAX_LINK_CREDIT ).
1870
1889
1871
- -spec flow ({uint , link_handle ()}, sequence_no (), non_neg_integer ()) -> # 'v1_0.flow' {}.
1890
+ -spec flow ({uint , link_handle ()}, sequence_no (), rabbit_queue_type :credit ()) ->
1891
+ # 'v1_0.flow' {}.
1872
1892
flow (Handle , DeliveryCount , LinkCredit ) ->
1873
1893
# 'v1_0.flow' {handle = Handle ,
1874
1894
delivery_count = ? UINT (DeliveryCount ),
@@ -2394,7 +2414,7 @@ released(DeliveryId) ->
2394
2414
maybe_grant_link_credit (Credit , DeliveryCount , NumUnconfirmed , Handle ) ->
2395
2415
case grant_link_credit (Credit , NumUnconfirmed ) of
2396
2416
true ->
2397
- {? LINK_CREDIT_RCV , [flow (Handle , DeliveryCount )]};
2417
+ {? MAX_LINK_CREDIT , [flow (Handle , DeliveryCount )]};
2398
2418
false ->
2399
2419
{Credit , []}
2400
2420
end .
@@ -2407,20 +2427,21 @@ maybe_grant_link_credit(
2407
2427
AccMap ) ->
2408
2428
case grant_link_credit (Credit , map_size (U )) of
2409
2429
true ->
2410
- {Link # incoming_link {credit = ? LINK_CREDIT_RCV },
2430
+ {Link # incoming_link {credit = ? MAX_LINK_CREDIT },
2411
2431
AccMap #{HandleInt => DeliveryCount }};
2412
2432
false ->
2413
2433
{Link , AccMap }
2414
2434
end .
2415
2435
2416
2436
grant_link_credit (Credit , NumUnconfirmed ) ->
2417
- Credit =< ? LINK_CREDIT_RCV / 2 andalso
2418
- NumUnconfirmed < ? LINK_CREDIT_RCV .
2437
+ MaxLinkCredit = ? MAX_LINK_CREDIT ,
2438
+ Credit =< MaxLinkCredit div 2 andalso
2439
+ NumUnconfirmed < MaxLinkCredit .
2419
2440
2420
2441
maybe_grant_mgmt_link_credit (Credit , DeliveryCount , Handle )
2421
- when Credit =< ? MANAGEMENT_LINK_CREDIT_RCV / 2 ->
2422
- {? MANAGEMENT_LINK_CREDIT_RCV ,
2423
- [flow (Handle , DeliveryCount , ? MANAGEMENT_LINK_CREDIT_RCV )]};
2442
+ when Credit =< ? MAX_MANAGEMENT_LINK_CREDIT div 2 ->
2443
+ {? MAX_MANAGEMENT_LINK_CREDIT ,
2444
+ [flow (Handle , DeliveryCount , ? MAX_MANAGEMENT_LINK_CREDIT )]};
2424
2445
maybe_grant_mgmt_link_credit (Credit , _ , _ ) ->
2425
2446
{Credit , []}.
2426
2447
@@ -3406,10 +3427,16 @@ error_not_found(Resource) ->
3406
3427
condition = ? V_1_0_AMQP_ERROR_NOT_FOUND ,
3407
3428
description = {utf8 , Description }}.
3408
3429
3430
+ is_valid_max (Val ) ->
3431
+ is_integer (Val ) andalso
3432
+ Val > 0 andalso
3433
+ Val =< ? UINT_MAX .
3434
+
3409
3435
-spec cap_credit (rabbit_queue_type :credit ()) ->
3410
- 0 .. ? LINK_CREDIT_RCV_FROM_QUEUE_MAX .
3436
+ rabbit_queue_type : credit () .
3411
3437
cap_credit (DesiredCredit ) ->
3412
- min (DesiredCredit , ? LINK_CREDIT_RCV_FROM_QUEUE_MAX ).
3438
+ MaxCredit = persistent_term :get (max_queue_credit ),
3439
+ min (DesiredCredit , MaxCredit ).
3413
3440
3414
3441
ensure_mc_cluster_compat (Mc ) ->
3415
3442
IsEnabled = rabbit_feature_flags :is_enabled (message_containers_store_amqp_v1 ),
0 commit comments