Skip to content

Commit 31a4b25

Browse files
committed
Merge pull request #331 from rabbitmq/rabbitmq-server-330
Adds documentation for credit_flow usage
2 parents 3136aa2 + b2a63a0 commit 31a4b25

8 files changed

+80
-3
lines changed

src/credit_flow.erl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,24 @@
2727
%% receiver it will not grant any more credit to its senders when it
2828
%% is itself blocked - thus the only processes that need to check
2929
%% blocked/0 are ones that read from network sockets.
30+
%%
31+
%% Credit flows left to right when process send messags down the
32+
%% chain, starting at the rabbit_reader, ending at the msg_store:
33+
%% reader -> channel -> queue_process -> msg_store.
34+
%%
35+
%% If the message store has a back log, then it will block the
36+
%% queue_process, which will block the channel, and finally the reader
37+
%% will be blocked, throttling down publishers.
38+
%%
39+
%% Once a process is unblocked, it will grant credits up the chain,
40+
%% possibly unblocking other processes:
41+
%% reader <--grant channel <--grant queue_process <--grant msg_store.
42+
%%
43+
%% Grepping the project files for `credit_flow` will reveal the places
44+
%% where this module is currently used, with extra comments on what's
45+
%% going on at each instance. Note that credit flow between mirrors
46+
%% synchronization has not been documented, since this doesn't affect
47+
%% client publishes.
3048

3149
-define(DEFAULT_INITIAL_CREDIT, 200).
3250
-define(DEFAULT_MORE_CREDIT_AFTER, 50).

src/rabbit_amqqueue.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
873873
%% the slave receives the message direct from the channel, and the
874874
%% other when it receives it via GM.
875875
case Flow of
876+
%% Here we are tracking messages sent by the rabbit_channel
877+
%% process. We are accessing the rabbit_channel process
878+
%% dictionary.
876879
flow -> [credit_flow:send(QPid) || QPid <- QPids],
877880
[credit_flow:send(QPid) || QPid <- SPids];
878881
noflow -> ok

src/rabbit_amqqueue_process.erl

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -662,9 +662,21 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
662662
exclusive_consumer = Holder,
663663
senders = Senders}) ->
664664
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
665-
false -> Senders;
666-
true -> credit_flow:peer_down(DownPid),
667-
pmon:demonitor(DownPid, Senders)
665+
false ->
666+
Senders;
667+
true ->
668+
%% A rabbit_channel process died. Here credit_flow will take care
669+
%% of cleaning up the rabbit_amqqueue_process process dictionary
670+
%% with regards to the credit we were tracking for the channel
671+
%% process. See handle_cast({deliver, Deliver}, State) in this
672+
%% module. In that cast function we process deliveries from the
673+
%% channel, which means we credit_flow:ack/1 said
674+
%% messages. credit_flow:ack'ing messages means we are increasing
675+
%% a counter to know when we need to send MoreCreditAfter. Since
676+
%% the process died, the credit_flow flow module will clean up
677+
%% that for us.
678+
credit_flow:peer_down(DownPid),
679+
pmon:demonitor(DownPid, Senders)
668680
end},
669681
case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
670682
not_found ->
@@ -1110,6 +1122,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
11101122
flow = Flow}, SlaveWhenPublished},
11111123
State = #q{senders = Senders}) ->
11121124
Senders1 = case Flow of
1125+
%% In both credit_flow:ack/1 we are acking messages to the channel
1126+
%% process that sent us the message delivery. See handle_ch_down
1127+
%% for more info.
11131128
flow -> credit_flow:ack(Sender),
11141129
case SlaveWhenPublished of
11151130
true -> credit_flow:ack(Sender); %% [0]
@@ -1289,6 +1304,12 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
12891304

12901305
handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
12911306
backing_queue_state = BQS}) ->
1307+
%% The message_store is granting us more credit. This means the
1308+
%% backing queue (for the rabbit_variable_queue case) might
1309+
%% continue paging messages to disk if it still needs to. We
1310+
%% consume credits from the message_store whenever we need to
1311+
%% persist a message to disk. See:
1312+
%% rabbit_variable_queue:msg_store_write/4.
12921313
credit_flow:handle_bump_msg(Msg),
12931314
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
12941315

src/rabbit_channel.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ do(Pid, Method, Content) ->
133133
gen_server2:cast(Pid, {method, Method, Content, noflow}).
134134

135135
do_flow(Pid, Method, Content) ->
136+
%% Here we are tracking messages sent by the rabbit_reader
137+
%% process. We are accessing the rabbit_reader process dictionary.
136138
credit_flow:send(Pid),
137139
gen_server2:cast(Pid, {method, Method, Content, flow}).
138140

@@ -327,6 +329,9 @@ handle_cast({method, Method, Content, Flow},
327329
State = #ch{reader_pid = Reader,
328330
virtual_host = VHost}) ->
329331
case Flow of
332+
%% We are going to process a message from the rabbit_reader
333+
%% process, so here we ack it. In this case we are accessing
334+
%% the rabbit_channel process dictionary.
330335
flow -> credit_flow:ack(Reader);
331336
noflow -> ok
332337
end,
@@ -435,6 +440,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
435440
noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
436441

437442
handle_info({bump_credit, Msg}, State) ->
443+
%% A rabbit_amqqueue_process is granting credit to our channel. If
444+
%% our channel was being blocked by this process, and no other
445+
%% process is blocking our channel, then this channel will be
446+
%% unblocked. This means that any credit that was deferred will be
447+
%% sent to rabbit_reader processs that might be blocked by this
448+
%% particular channel.
438449
credit_flow:handle_bump_msg(Msg),
439450
noreply(State);
440451

@@ -452,6 +463,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
452463
State1 = handle_publishing_queue_down(QPid, Reason, State),
453464
State3 = handle_consuming_queue_down(QPid, State1),
454465
State4 = handle_delivering_queue_down(QPid, State3),
466+
%% A rabbit_amqqueue_process has died. If our channel was being
467+
%% blocked by this process, and no other process is blocking our
468+
%% channel, then this channel will be unblocked. This means that
469+
%% any credit that was deferred will be sent to the rabbit_reader
470+
%% processs that might be blocked by this particular channel.
455471
credit_flow:peer_down(QPid),
456472
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
457473
case dict:find(QPid, QNames) of

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
257257
State) ->
258258
%% Asynchronous, non-"mandatory", deliver mode.
259259
case Flow of
260+
%% We are acking messages to the channel process that sent us
261+
%% the message delivery. See
262+
%% rabbit_amqqueue_process:handle_ch_down for more info.
260263
flow -> credit_flow:ack(Sender);
261264
noflow -> ok
262265
end,

src/rabbit_msg_store.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ write_flow(MsgId, Msg,
475475
CState = #client_msstate {
476476
server = Server,
477477
credit_disc_bound = CreditDiscBound }) ->
478+
%% Here we are tracking messages sent by the
479+
%% rabbit_amqqueue_process process via the
480+
%% rabbit_variable_queue. We are accessing the
481+
%% rabbit_amqqueue_process process dictionary.
478482
credit_flow:send(whereis(Server), CreditDiscBound),
479483
client_write(MsgId, Msg, flow, CState).
480484

@@ -829,6 +833,9 @@ handle_cast({write, CRef, MsgId, Flow},
829833
credit_disc_bound = CreditDiscBound }) ->
830834
case Flow of
831835
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
836+
%% We are going to process a message sent by the
837+
%% rabbit_amqqueue_process. Now we are accessing the
838+
%% msg_store process dictionary.
832839
credit_flow:ack(CPid, CreditDiscBound);
833840
noflow -> ok
834841
end,
@@ -904,6 +911,10 @@ handle_info(timeout, State) ->
904911
noreply(internal_sync(State));
905912

906913
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
914+
%% similar to what happens in
915+
%% rabbit_amqqueue_process:handle_ch_down but with a relation of
916+
%% msg_store -> rabbit_amqqueue_process instead of
917+
%% rabbit_amqqueue_process -> rabbit_channel.
907918
credit_flow:peer_down(Pid),
908919
noreply(State);
909920

src/rabbit_reader.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ handle_other(ensure_stats, State) ->
478478
handle_other(emit_stats, State) ->
479479
emit_stats(State);
480480
handle_other({bump_credit, Msg}, State) ->
481+
%% Here we are receiving credit by some channel process.
481482
credit_flow:handle_bump_msg(Msg),
482483
control_throttle(State);
483484
handle_other(Other, State) ->

src/rabbit_variable_queue.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,6 +2115,10 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
21152115
TargetRamCount >= RamMsgCount ->
21162116
{Quota, ui(State)};
21172117
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
2118+
%% We consume credits from the message_store whenever we need to
2119+
%% persist a message to disk. See:
2120+
%% rabbit_variable_queue:msg_store_write/4. So perhaps the
2121+
%% msg_store is trying to throttle down our queue.
21182122
case credit_flow:blocked() of
21192123
true -> {Quota, ui(State)};
21202124
false -> case Generator(Q) of

0 commit comments

Comments
 (0)