Skip to content

Commit 60d65c0

Browse files
committed
Use rabbit_queue_behaviour and callback
Alternative for #1388 that does not use process dictionary. Requires rabbitmq/rabbitmq-common#228 Fix waiting_bump values
1 parent 27bf1d6 commit 60d65c0

File tree

5 files changed

+38
-19
lines changed

5 files changed

+38
-19
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,10 +1454,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
14541454
%% rabbit_variable_queue:msg_store_write/4.
14551455
credit_flow:handle_bump_msg(Msg),
14561456
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1457-
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1458-
backing_queue_state = BQS}) ->
1459-
put(waiting_bump, false),
1460-
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1457+
handle_info(bump_reduce_memory_use, State = #q{ backing_queue = BQ,
1458+
backing_queue_state = BQS0 }) ->
1459+
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
1460+
noreply(State#q{ backing_queue_state = BQ:resume(BQS1) });
14611461

14621462
handle_info(Info, State) ->
14631463
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_master.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
2525
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
27-
zip_msgs_and_acks/4]).
27+
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).
2828

2929
-export([start/2, stop/1, delete_crashed/1]).
3030

@@ -525,6 +525,11 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
525525
backing_queue_state = BQS }) ->
526526
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
527527

528+
handled_bump_reduce_memory_use(State = #state{ backing_queue = BQ,
529+
backing_queue_state = BQS0 }) ->
530+
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
531+
State#state{ backing_queue_state = BQS1 }.
532+
528533
%% ---------------------------------------------------------------------------
529534
%% Other exported functions
530535
%% ---------------------------------------------------------------------------

src/rabbit_priority_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
4242
handle_pre_hibernate/1, resume/1, msg_rates/1,
4343
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
44-
zip_msgs_and_acks/4]).
44+
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).
4545

4646
-record(state, {bq, bqss, max_priority}).
4747
-record(passthrough, {bq, bqs}).
@@ -447,6 +447,9 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
447447
#passthrough{bq = BQ, bqs = BQS}) ->
448448
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
449449

450+
handled_bump_reduce_memory_use(State = #passthrough{ bq = BQ, bqs = BQS }) ->
451+
?passthrough1(handled_bump_reduce_memory_use(BQS)).
452+
450453
%%----------------------------------------------------------------------------
451454

452455
bq() ->

src/rabbit_variable_queue.erl

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4, multiple_routing_keys/0]).
29+
zip_msgs_and_acks/4, multiple_routing_keys/0,
30+
handled_bump_reduce_memory_use/1]).
3031

3132
-export([start/2, stop/1]).
3233

@@ -325,7 +326,8 @@
325326
memory_reduction_run_count,
326327
%% Queue data is grouped by VHost. We need to store it
327328
%% to work with queue index.
328-
virtual_host
329+
virtual_host,
330+
waiting_bump = false
329331
}).
330332

331333
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -2466,21 +2468,16 @@ reduce_memory_use(State = #vqstate {
24662468
Blocked = credit_flow:blocked(),
24672469
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
24682470
%% Credit bump will continue paging
2469-
{true, _} -> ok;
2471+
{true, _} -> State3;
24702472
%% Finished with paging
2471-
{false, false} -> ok;
2473+
{false, false} -> State3;
24722474
%% Planning next batch
24732475
{false, true} ->
24742476
%% We don't want to use self-credit-flow, because it's harder to
24752477
%% reason about. So the process sends a (prioritised) message to
24762478
%% itself and sets a waiting_bump value to keep the message box clean
2477-
case get(waiting_bump) of
2478-
true -> ok;
2479-
_ -> self() ! bump_reduce_memory_use,
2480-
put(waiting_bump, true)
2481-
end
2482-
end,
2483-
State3;
2479+
maybe_bump_reduce_memory_use(State3)
2480+
end;
24842481
%% When using lazy queues, there are no alphas, so we don't need to
24852482
%% call push_alphas_to_betas/2.
24862483
reduce_memory_use(State = #vqstate {
@@ -2506,6 +2503,15 @@ reduce_memory_use(State = #vqstate {
25062503
garbage_collect(),
25072504
State3.
25082505

2506+
maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
2507+
State;
2508+
maybe_bump_reduce_memory_use(State) ->
2509+
self() ! bump_reduce_memory_use,
2510+
State#vqstate{ waiting_bump = true }.
2511+
2512+
handled_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
2513+
State#vqstate{ waiting_bump = false }.
2514+
25092515
limit_ram_acks(0, State) ->
25102516
{0, ui(State)};
25112517
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,

test/channel_operation_timeout_test_queue.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4]).
30-
-export([start/2, stop/1]).
29+
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).
30+
31+
%% exported for testing only
32+
-export([start_msg_store/2, stop_msg_store/0, init/6]).
3133

3234
%%----------------------------------------------------------------------------
3335
%% This test backing queue follows the variable queue implementation, with
@@ -303,6 +305,9 @@ set_queue_mode(Mode, State) ->
303305
zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) ->
304306
rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State).
305307

308+
handled_bump_reduce_memory_use(State) ->
309+
rabbit_variable_queue:handled_bump_reduce_memory_use(State).
310+
306311
%% Delay
307312
maybe_delay(QPA) ->
308313
case is_timeout_test(gb_trees:values(QPA)) of

0 commit comments

Comments
 (0)