Skip to content

Commit e9d1588

Browse files
author
Daniil Fedotov
committed
Merge branch 'stable'
2 parents 4c5ebd9 + b55f79d commit e9d1588

File tree

3 files changed

+68
-23
lines changed

3 files changed

+68
-23
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,9 +1031,9 @@ prioritise_cast(Msg, _Len, _State) ->
10311031
{set_ram_duration_target, _Duration} -> 8;
10321032
{set_maximum_since_use, _Age} -> 8;
10331033
{run_backing_queue, _Mod, _Fun} -> 6;
1034-
{ack, _AckTags, _ChPid} -> 3; %% [1]
1035-
{resume, _ChPid} -> 2;
1036-
{notify_sent, _ChPid, _Credit} -> 1;
1034+
{ack, _AckTags, _ChPid} -> 4; %% [1]
1035+
{resume, _ChPid} -> 3;
1036+
{notify_sent, _ChPid, _Credit} -> 2;
10371037
_ -> 0
10381038
end.
10391039

@@ -1045,6 +1045,9 @@ prioritise_cast(Msg, _Len, _State) ->
10451045
%% stack are optimised for that) and to make things easier to reason
10461046
%% about. Finally, we prioritise ack over resume since it should
10471047
%% always reduce memory use.
1048+
%% bump_reduce_memory_use is prioritised over publishes, because sending
1049+
%% credit to self is hard to reason about. Consumers can continue while
1050+
%% reduce_memory_use is in progress.
10481051

10491052
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10501053
case Msg of
@@ -1054,6 +1057,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10541057
{drop_expired, _Version} -> 8;
10551058
emit_stats -> 7;
10561059
sync_timeout -> 6;
1060+
bump_reduce_memory_use -> 1;
10571061
_ -> 0
10581062
end.
10591063

@@ -1442,6 +1446,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
14421446
%% rabbit_variable_queue:msg_store_write/4.
14431447
credit_flow:handle_bump_msg(Msg),
14441448
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1449+
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1450+
backing_queue_state = BQS}) ->
1451+
put(waiting_bump, false),
1452+
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
14451453

14461454
handle_info(Info, State) ->
14471455
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ handle_info({bump_credit, Msg}, State) ->
372372
credit_flow:handle_bump_msg(Msg),
373373
noreply(State);
374374

375+
handle_info(bump_reduce_memory_use, State) ->
376+
noreply(State);
377+
375378
%% In the event of a short partition during sync we can detect the
376379
%% master's 'death', drop out of sync, and then receive sync messages
377380
%% which were still in flight. Ignore them.

src/rabbit_variable_queue.erl

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,45 +2407,79 @@ reduce_memory_use(State = #vqstate {
24072407
out = AvgEgress,
24082408
ack_in = AvgAckIngress,
24092409
ack_out = AvgAckEgress } }) ->
2410-
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
2410+
{CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
2411+
msg_store_credit_disc_bound,
2412+
?CREDIT_DISC_BOUND),
2413+
{NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
24112414
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
2412-
0 -> State;
2415+
0 -> {false, State};
24132416
%% Reduce memory of pending acks and alphas. The order is
24142417
%% determined based on which is growing faster. Whichever
24152418
%% comes second may very well get a quota of 0 if the
24162419
%% first manages to push out the max number of messages.
2417-
S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
2420+
A2BChunk ->
2421+
%% In case there are few messages to be sent to a message store
2422+
%% and many messages to be embedded to the queue index,
2423+
%% we should limit the number of messages to be flushed
2424+
%% to avoid blocking the process.
2425+
A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
2426+
true -> CreditDiscBound * 2;
2427+
false -> A2BChunk
2428+
end,
2429+
Funs = case ((AvgAckIngress - AvgAckEgress) >
24182430
(AvgIngress - AvgEgress)) of
24192431
true -> [fun limit_ram_acks/2,
24202432
fun push_alphas_to_betas/2];
24212433
false -> [fun push_alphas_to_betas/2,
24222434
fun limit_ram_acks/2]
24232435
end,
2424-
{_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
2436+
{Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
24252437
ReduceFun(QuotaN, StateN)
2426-
end, {S1, State}, Funs),
2427-
State2
2438+
end, {A2BChunkActual, State}, Funs),
2439+
{(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
24282440
end,
2429-
2430-
State3 =
2441+
Permitted = permitted_beta_count(State1),
2442+
{NeedResumeB2D, State3} =
24312443
%% If there are more messages with their queue position held in RAM,
24322444
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
24332445
%% write their queue position to disk, a.k.a. push_betas_to_deltas
24342446
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
2435-
permitted_beta_count(State1)) of
2436-
S2 when S2 >= IoBatchSize ->
2437-
%% There is an implicit, but subtle, upper bound here. We
2438-
%% may shuffle a lot of messages from Q2/3 into delta, but
2439-
%% the number of these that require any disk operation,
2440-
%% namely index writing, i.e. messages that are genuine
2441-
%% betas and not gammas, is bounded by the credit_flow
2442-
%% limiting of the alpha->beta conversion above.
2443-
push_betas_to_deltas(S2, State1);
2447+
Permitted) of
2448+
B2DChunk when B2DChunk >= IoBatchSize ->
2449+
%% Same as for alphas to betas. Limit a number of messages
2450+
%% to be flushed to disk at once to avoid blocking the process.
2451+
B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
2452+
true -> CreditDiscBound * 2;
2453+
false -> B2DChunk
2454+
end,
2455+
StateBD = push_betas_to_deltas(B2DChunkActual, State1),
2456+
{B2DChunk > B2DChunkActual, StateBD};
24442457
_ ->
2445-
State1
2458+
{false, State1}
24462459
end,
2447-
%% See rabbitmq-server-290 for the reasons behind this GC call.
2448-
garbage_collect(),
2460+
%% We can be blocked by the credit flow, or limited by a batch size,
2461+
%% or finished with flushing.
2462+
%% If blocked by the credit flow - the credit grant will resume processing,
2463+
%% if limited by a batch - the batch continuation message should be sent.
2464+
%% The continuation message will be prioritised over publishes,
2465+
%% but not cinsumptions, so the queue can make progess.
2466+
Blocked = credit_flow:blocked(),
2467+
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
2468+
%% Credit bump will continue paging
2469+
{true, _} -> ok;
2470+
%% Finished with paging
2471+
{false, false} -> ok;
2472+
%% Planning next batch
2473+
{false, true} ->
2474+
%% We don't want to use self-credit-flow, because it's harder to
2475+
%% reason about. So the process sends a (prioritised) message to
2476+
%% itself and sets a waiting_bump value to keep the message box clean
2477+
case get(waiting_bump) of
2478+
true -> ok;
2479+
_ -> self() ! bump_reduce_memory_use,
2480+
put(waiting_bump, true)
2481+
end
2482+
end,
24492483
State3;
24502484
%% When using lazy queues, there are no alphas, so we don't need to
24512485
%% call push_alphas_to_betas/2.

0 commit comments

Comments
 (0)