Skip to content

Commit b55f79d

Browse files
authored
Merge pull request #1388 from rabbitmq/rabbitmq-server-batch-betas
Flush messages to disk in batches.
2 parents 88485d9 + dc915ba commit b55f79d

File tree

3 files changed

+68
-23
lines changed

3 files changed

+68
-23
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) ->
985985
{set_ram_duration_target, _Duration} -> 8;
986986
{set_maximum_since_use, _Age} -> 8;
987987
{run_backing_queue, _Mod, _Fun} -> 6;
988-
{ack, _AckTags, _ChPid} -> 3; %% [1]
989-
{resume, _ChPid} -> 2;
990-
{notify_sent, _ChPid, _Credit} -> 1;
988+
{ack, _AckTags, _ChPid} -> 4; %% [1]
989+
{resume, _ChPid} -> 3;
990+
{notify_sent, _ChPid, _Credit} -> 2;
991991
_ -> 0
992992
end.
993993

@@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) ->
999999
%% stack are optimised for that) and to make things easier to reason
10001000
%% about. Finally, we prioritise ack over resume since it should
10011001
%% always reduce memory use.
1002+
%% bump_reduce_memory_use is prioritised over publishes, because sending
1003+
%% credit to self is hard to reason about. Consumers can continue while
1004+
%% reduce_memory_use is in progress.
10021005

10031006
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10041007
case Msg of
@@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10081011
{drop_expired, _Version} -> 8;
10091012
emit_stats -> 7;
10101013
sync_timeout -> 6;
1014+
bump_reduce_memory_use -> 1;
10111015
_ -> 0
10121016
end.
10131017

@@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
13821386
%% rabbit_variable_queue:msg_store_write/4.
13831387
credit_flow:handle_bump_msg(Msg),
13841388
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1389+
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1390+
backing_queue_state = BQS}) ->
1391+
put(waiting_bump, false),
1392+
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
13851393

13861394
handle_info(Info, State) ->
13871395
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) ->
348348
credit_flow:handle_bump_msg(Msg),
349349
noreply(State);
350350

351+
handle_info(bump_reduce_memory_use, State) ->
352+
noreply(State);
353+
351354
%% In the event of a short partition during sync we can detect the
352355
%% master's 'death', drop out of sync, and then receive sync messages
353356
%% which were still in flight. Ignore them.

src/rabbit_variable_queue.erl

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
23632363
out = AvgEgress,
23642364
ack_in = AvgAckIngress,
23652365
ack_out = AvgAckEgress } }) ->
2366-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
2366+
{CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
2367+
msg_store_credit_disc_bound,
2368+
?CREDIT_DISC_BOUND),
2369+
{NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
23672370
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
2368-
0 -> State;
2371+
0 -> {false, State};
23692372
%% Reduce memory of pending acks and alphas. The order is
23702373
%% determined based on which is growing faster. Whichever
23712374
%% comes second may very well get a quota of 0 if the
23722375
%% first manages to push out the max number of messages.
2373-
S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
2376+
A2BChunk ->
2377+
%% In case there are few messages to be sent to a message store
2378+
%% and many messages to be embedded to the queue index,
2379+
%% we should limit the number of messages to be flushed
2380+
%% to avoid blocking the process.
2381+
A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
2382+
true -> CreditDiscBound * 2;
2383+
false -> A2BChunk
2384+
end,
2385+
Funs = case ((AvgAckIngress - AvgAckEgress) >
23742386
(AvgIngress - AvgEgress)) of
23752387
true -> [fun limit_ram_acks/2,
23762388
fun push_alphas_to_betas/2];
23772389
false -> [fun push_alphas_to_betas/2,
23782390
fun limit_ram_acks/2]
23792391
end,
2380-
{_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
2392+
{Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
23812393
ReduceFun(QuotaN, StateN)
2382-
end, {S1, State}, Funs),
2383-
State2
2394+
end, {A2BChunkActual, State}, Funs),
2395+
{(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
23842396
end,
2385-
2386-
State3 =
2397+
Permitted = permitted_beta_count(State1),
2398+
{NeedResumeB2D, State3} =
23872399
%% If there are more messages with their queue position held in RAM,
23882400
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
23892401
%% write their queue position to disk, a.k.a. push_betas_to_deltas
23902402
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
2391-
permitted_beta_count(State1)) of
2392-
S2 when S2 >= IoBatchSize ->
2393-
%% There is an implicit, but subtle, upper bound here. We
2394-
%% may shuffle a lot of messages from Q2/3 into delta, but
2395-
%% the number of these that require any disk operation,
2396-
%% namely index writing, i.e. messages that are genuine
2397-
%% betas and not gammas, is bounded by the credit_flow
2398-
%% limiting of the alpha->beta conversion above.
2399-
push_betas_to_deltas(S2, State1);
2403+
Permitted) of
2404+
B2DChunk when B2DChunk >= IoBatchSize ->
2405+
%% Same as for alphas to betas. Limit a number of messages
2406+
%% to be flushed to disk at once to avoid blocking the process.
2407+
B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
2408+
true -> CreditDiscBound * 2;
2409+
false -> B2DChunk
2410+
end,
2411+
StateBD = push_betas_to_deltas(B2DChunkActual, State1),
2412+
{B2DChunk > B2DChunkActual, StateBD};
24002413
_ ->
2401-
State1
2414+
{false, State1}
24022415
end,
2403-
%% See rabbitmq-server-290 for the reasons behind this GC call.
2404-
garbage_collect(),
2416+
%% We can be blocked by the credit flow, or limited by a batch size,
2417+
%% or finished with flushing.
2418+
%% If blocked by the credit flow - the credit grant will resume processing,
2419+
%% if limited by a batch - the batch continuation message should be sent.
2420+
%% The continuation message will be prioritised over publishes,
2421+
%% but not cinsumptions, so the queue can make progess.
2422+
Blocked = credit_flow:blocked(),
2423+
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
2424+
%% Credit bump will continue paging
2425+
{true, _} -> ok;
2426+
%% Finished with paging
2427+
{false, false} -> ok;
2428+
%% Planning next batch
2429+
{false, true} ->
2430+
%% We don't want to use self-credit-flow, because it's harder to
2431+
%% reason about. So the process sends a (prioritised) message to
2432+
%% itself and sets a waiting_bump value to keep the message box clean
2433+
case get(waiting_bump) of
2434+
true -> ok;
2435+
_ -> self() ! bump_reduce_memory_use,
2436+
put(waiting_bump, true)
2437+
end
2438+
end,
24052439
State3;
24062440
%% When using lazy queues, there are no alphas, so we don't need to
24072441
%% call push_alphas_to_betas/2.

0 commit comments

Comments
 (0)