Skip to content

Commit f1de592

Browse files
Daniil Fedotovgerhard
authored andcommitted
Flush messages to disk in batches.
If messages should be embedded to a queue index, there will be no credit flow limit, so message batches can be too big and block the queue process. Limiting the batch size allows consumer to make progress while publishers are blocked by the paging-out process. [#151614048]
1 parent 8d8246b commit f1de592

File tree

3 files changed

+68
-23
lines changed

3 files changed

+68
-23
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) ->
985985
{set_ram_duration_target, _Duration} -> 8;
986986
{set_maximum_since_use, _Age} -> 8;
987987
{run_backing_queue, _Mod, _Fun} -> 6;
988-
{ack, _AckTags, _ChPid} -> 3; %% [1]
989-
{resume, _ChPid} -> 2;
990-
{notify_sent, _ChPid, _Credit} -> 1;
988+
{ack, _AckTags, _ChPid} -> 4; %% [1]
989+
{resume, _ChPid} -> 3;
990+
{notify_sent, _ChPid, _Credit} -> 2;
991991
_ -> 0
992992
end.
993993

@@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) ->
999999
%% stack are optimised for that) and to make things easier to reason
10001000
%% about. Finally, we prioritise ack over resume since it should
10011001
%% always reduce memory use.
1002+
%% bump_reduce_memory_use is prioritised over publishes, because sending
1003+
%% credit to self is hard to reason about. Consumers can continue while
1004+
%% reduce_memory_use is in progress.
10021005

10031006
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10041007
case Msg of
@@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10081011
{drop_expired, _Version} -> 8;
10091012
emit_stats -> 7;
10101013
sync_timeout -> 6;
1014+
bump_reduce_memory_use -> 1;
10111015
_ -> 0
10121016
end.
10131017

@@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
13821386
%% rabbit_variable_queue:msg_store_write/4.
13831387
credit_flow:handle_bump_msg(Msg),
13841388
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1389+
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1390+
backing_queue_state = BQS}) ->
1391+
put(waiting_bump, false),
1392+
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
13851393

13861394
handle_info(Info, State) ->
13871395
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) ->
348348
credit_flow:handle_bump_msg(Msg),
349349
noreply(State);
350350

351+
handle_info(bump_reduce_memory_use, State) ->
352+
noreply(State);
353+
351354
%% In the event of a short partition during sync we can detect the
352355
%% master's 'death', drop out of sync, and then receive sync messages
353356
%% which were still in flight. Ignore them.

src/rabbit_variable_queue.erl

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
23632363
out = AvgEgress,
23642364
ack_in = AvgAckIngress,
23652365
ack_out = AvgAckEgress } }) ->
2366-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
2366+
{CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
2367+
msg_store_credit_disc_bound,
2368+
?CREDIT_DISC_BOUND),
2369+
{NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
23672370
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
2368-
0 -> State;
2371+
0 -> {false, State};
23692372
%% Reduce memory of pending acks and alphas. The order is
23702373
%% determined based on which is growing faster. Whichever
23712374
%% comes second may very well get a quota of 0 if the
23722375
%% first manages to push out the max number of messages.
2373-
S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
2376+
A2BChunk ->
2377+
%% In case there are few messages to be sent to a message store
2378+
%% and many messages to be embedded to the queue index,
2379+
%% we should limit the number of messages to be flushed
2380+
%% to avoid blocking the process.
2381+
A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
2382+
true -> CreditDiscBound * 2;
2383+
false -> A2BChunk
2384+
end,
2385+
Funs = case ((AvgAckIngress - AvgAckEgress) >
23742386
(AvgIngress - AvgEgress)) of
23752387
true -> [fun limit_ram_acks/2,
23762388
fun push_alphas_to_betas/2];
23772389
false -> [fun push_alphas_to_betas/2,
23782390
fun limit_ram_acks/2]
23792391
end,
2380-
{_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
2392+
{Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
23812393
ReduceFun(QuotaN, StateN)
2382-
end, {S1, State}, Funs),
2383-
State2
2394+
end, {A2BChunkActual, State}, Funs),
2395+
{(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
23842396
end,
2385-
2386-
State3 =
2397+
Permitted = permitted_beta_count(State1),
2398+
{NeedResumeB2D, State3} =
23872399
%% If there are more messages with their queue position held in RAM,
23882400
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
23892401
%% write their queue position to disk, a.k.a. push_betas_to_deltas
23902402
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
2391-
permitted_beta_count(State1)) of
2392-
S2 when S2 >= IoBatchSize ->
2393-
%% There is an implicit, but subtle, upper bound here. We
2394-
%% may shuffle a lot of messages from Q2/3 into delta, but
2395-
%% the number of these that require any disk operation,
2396-
%% namely index writing, i.e. messages that are genuine
2397-
%% betas and not gammas, is bounded by the credit_flow
2398-
%% limiting of the alpha->beta conversion above.
2399-
push_betas_to_deltas(S2, State1);
2403+
Permitted) of
2404+
B2DChunk when B2DChunk >= IoBatchSize ->
2405+
%% Same as for alphas to betas. Limit a number of messages
2406+
%% to be flushed to disk at once to avoid blocking the process.
2407+
B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
2408+
true -> CreditDiscBound * 2;
2409+
false -> B2DChunk
2410+
end,
2411+
StateBD = push_betas_to_deltas(B2DChunkActual, State1),
2412+
{B2DChunk > B2DChunkActual, StateBD};
24002413
_ ->
2401-
State1
2414+
{false, State1}
24022415
end,
2403-
%% See rabbitmq-server-290 for the reasons behind this GC call.
2404-
garbage_collect(),
2416+
%% We can be blocked by the credit flow, or limited by a batch size,
2417+
%% or finished with flushing.
2418+
%% If blocked by the credit flow - the credit grant will resume processing,
2419+
%% if limited by a batch - the batch continuation message should be sent.
2420+
%% The continuation message will be prioritised over publishes,
2421+
%% but not cinsumptions, so the queue can make progess.
2422+
Blocked = credit_flow:blocked(),
2423+
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
2424+
%% Credit bump will continue paging
2425+
{true, _} -> ok;
2426+
%% Finished with paging
2427+
{false, false} -> ok;
2428+
%% Planning next batch
2429+
{false, true} ->
2430+
%% We don't want to use self-credit-flow, because it's harder to
2431+
%% reason about. So the process sends a (prioritised) message to
2432+
%% itself and sets a waiting_bump value to keep the message box clean
2433+
case get(waiting_bump) of
2434+
true -> ok;
2435+
_ -> self() ! bump_reduce_memory_use,
2436+
put(waiting_bump, waiting)
2437+
end
2438+
end,
24052439
State3;
24062440
%% When using lazy queues, there are no alphas, so we don't need to
24072441
%% call push_alphas_to_betas/2.

0 commit comments

Comments
 (0)