Skip to content

Commit 2821efd

Browse files
committed
Use rabbit_queue_behaviour and callback
Alternative for #1388 that does not use process dictionary. Requires rabbitmq/rabbitmq-common#228
1 parent 79fb8a1 commit 2821efd

File tree

5 files changed

+36
-18
lines changed

5 files changed

+36
-18
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,10 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
13861386
%% rabbit_variable_queue:msg_store_write/4.
13871387
credit_flow:handle_bump_msg(Msg),
13881388
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1389-
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1390-
backing_queue_state = BQS}) ->
1391-
put(waiting_bump, false),
1392-
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1389+
handle_info(bump_reduce_memory_use, State = #q{ backing_queue = BQ,
1390+
backing_queue_state = BQS0 }) ->
1391+
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
1392+
noreply(State#q{ backing_queue_state = BQ:resume(BQS1) });
13931393

13941394
handle_info(Info, State) ->
13951395
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_master.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
2525
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
27-
zip_msgs_and_acks/4]).
27+
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).
2828

2929
-export([start/1, stop/0, delete_crashed/1]).
3030

@@ -513,6 +513,11 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
513513
backing_queue_state = BQS }) ->
514514
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
515515

516+
handled_bump_reduce_memory_use(State = #state{ backing_queue = BQ,
517+
backing_queue_state = BQS0 }) ->
518+
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
519+
State#state{ backing_queue_state = BQS1 }.
520+
516521
%% ---------------------------------------------------------------------------
517522
%% Other exported functions
518523
%% ---------------------------------------------------------------------------

src/rabbit_priority_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
4242
handle_pre_hibernate/1, resume/1, msg_rates/1,
4343
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
44-
zip_msgs_and_acks/4]).
44+
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).
4545

4646
-record(state, {bq, bqss, max_priority}).
4747
-record(passthrough, {bq, bqs}).
@@ -447,6 +447,9 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
447447
#passthrough{bq = BQ, bqs = BQS}) ->
448448
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
449449

450+
handled_bump_reduce_memory_use(State = #passthrough{ bq = BQ, bqs = BQS }) ->
451+
?passthrough1(handled_bump_reduce_memory_use(BQS)).
452+
450453
%%----------------------------------------------------------------------------
451454

452455
bq() ->

src/rabbit_variable_queue.erl

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4, multiple_routing_keys/0]).
29+
zip_msgs_and_acks/4, multiple_routing_keys/0,
30+
handled_bump_reduce_memory_use/1]).
3031

3132
-export([start/1, stop/0]).
3233

@@ -311,7 +312,8 @@
311312
%% number of reduce_memory_usage executions, once it
312313
%% reaches a threshold the queue will manually trigger a runtime GC
313314
%% see: maybe_execute_gc/1
314-
memory_reduction_run_count
315+
memory_reduction_run_count,
316+
waiting_bump
315317
}).
316318

317319
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -2422,21 +2424,16 @@ reduce_memory_use(State = #vqstate {
24222424
Blocked = credit_flow:blocked(),
24232425
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
24242426
%% Credit bump will continue paging
2425-
{true, _} -> ok;
2427+
{true, _} -> State3;
24262428
%% Finished with paging
2427-
{false, false} -> ok;
2429+
{false, false} -> State3;
24282430
%% Planning next batch
24292431
{false, true} ->
24302432
%% We don't want to use self-credit-flow, because it's harder to
24312433
%% reason about. So the process sends a (prioritised) message to
24322434
%% itself and sets a waiting_bump value to keep the message box clean
2433-
case get(waiting_bump) of
2434-
true -> ok;
2435-
_ -> self() ! bump_reduce_memory_use,
2436-
put(waiting_bump, waiting)
2437-
end
2438-
end,
2439-
State3;
2435+
maybe_bump_reduce_memory_use(State3)
2436+
end;
24402437
%% When using lazy queues, there are no alphas, so we don't need to
24412438
%% call push_alphas_to_betas/2.
24422439
reduce_memory_use(State = #vqstate {
@@ -2462,6 +2459,15 @@ reduce_memory_use(State = #vqstate {
24622459
garbage_collect(),
24632460
State3.
24642461

2462+
maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
2463+
State;
2464+
maybe_bump_reduce_memory_use(State) ->
2465+
self() ! bump_reduce_memory_use,
2466+
State#vqstate{ waiting_bump = waiting }.
2467+
2468+
handled_bump_reduce_memory_use(State = #vqstate{ waiting_bump = waiting }) ->
2469+
State#vqstate{ waiting_bump = true }.
2470+
24652471
limit_ram_acks(0, State) ->
24662472
{0, ui(State)};
24672473
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,

test/channel_operation_timeout_test_queue.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4, multiple_routing_keys/0]).
29+
zip_msgs_and_acks/4, multiple_routing_keys/0,
30+
handled_bump_reduce_memory_use/1]).
3031

3132
-export([start/1, stop/0]).
3233

@@ -710,6 +711,9 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
710711
[{Id, AckTag} | Acc]
711712
end, Accumulator, lists:zip(Msgs, AckTags)).
712713

714+
handled_bump_reduce_memory_use(State) ->
715+
State.
716+
713717
convert_to_lazy(State) ->
714718
State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
715719
set_ram_duration_target(0, State),

0 commit comments

Comments
 (0)