Skip to content

Commit dff1d20

Browse files
committed
Use rabbit_queue_behaviour and callback
Alternative for #1388 that does not use process dictionary. Requires rabbitmq/rabbitmq-common#228 Fix waiting_bump values
1 parent 27bf1d6 commit dff1d20

File tree

5 files changed

+34
-19
lines changed

5 files changed

+34
-19
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,9 +1455,9 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
14551455
credit_flow:handle_bump_msg(Msg),
14561456
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
14571457
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1458-
backing_queue_state = BQS}) ->
1459-
put(waiting_bump, false),
1460-
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1458+
backing_queue_state = BQS0}) ->
1459+
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
1460+
noreply(State#q{backing_queue_state = BQ:resume(BQS1)});
14611461

14621462
handle_info(Info, State) ->
14631463
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_master.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
2525
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
27-
zip_msgs_and_acks/4]).
27+
zip_msgs_and_acks/4, handle_info/2]).
2828

2929
-export([start/2, stop/1, delete_crashed/1]).
3030

@@ -447,6 +447,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
447447
backing_queue_state = BQS }) ->
448448
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
449449

450+
handle_info(Msg, State = #state { backing_queue = BQ,
451+
backing_queue_state = BQS }) ->
452+
State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }.
453+
450454
resume(State = #state { backing_queue = BQ,
451455
backing_queue_state = BQS }) ->
452456
State #state { backing_queue_state = BQ:resume(BQS) }.

src/rabbit_priority_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
4242
handle_pre_hibernate/1, resume/1, msg_rates/1,
4343
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
44-
zip_msgs_and_acks/4]).
44+
zip_msgs_and_acks/4, handle_info/2]).
4545

4646
-record(state, {bq, bqss, max_priority}).
4747
-record(passthrough, {bq, bqs}).
@@ -393,6 +393,9 @@ handle_pre_hibernate(State = #state{bq = BQ}) ->
393393
handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) ->
394394
?passthrough1(handle_pre_hibernate(BQS)).
395395

396+
handle_info(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
397+
?passthrough1(handle_info(Msg, BQS)).
398+
396399
resume(State = #state{bq = BQ}) ->
397400
foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State);
398401
resume(State = #passthrough{bq = BQ, bqs = BQS}) ->

src/rabbit_variable_queue.erl

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4, multiple_routing_keys/0]).
29+
zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).
3030

3131
-export([start/2, stop/1]).
3232

@@ -325,7 +325,8 @@
325325
memory_reduction_run_count,
326326
%% Queue data is grouped by VHost. We need to store it
327327
%% to work with queue index.
328-
virtual_host
328+
virtual_host,
329+
waiting_bump = false
329330
}).
330331

331332
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -911,6 +912,9 @@ timeout(State = #vqstate { index_state = IndexState }) ->
911912
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
912913
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
913914

915+
handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
916+
State#vqstate{ waiting_bump = false }.
917+
914918
resume(State) -> a(reduce_memory_use(State)).
915919

916920
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
@@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate {
24662470
Blocked = credit_flow:blocked(),
24672471
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
24682472
%% Credit bump will continue paging
2469-
{true, _} -> ok;
2473+
{true, _} -> State3;
24702474
%% Finished with paging
2471-
{false, false} -> ok;
2475+
{false, false} -> State3;
24722476
%% Planning next batch
24732477
{false, true} ->
24742478
%% We don't want to use self-credit-flow, because it's harder to
24752479
%% reason about. So the process sends a (prioritised) message to
24762480
%% itself and sets a waiting_bump value to keep the message box clean
2477-
case get(waiting_bump) of
2478-
true -> ok;
2479-
_ -> self() ! bump_reduce_memory_use,
2480-
put(waiting_bump, true)
2481-
end
2482-
end,
2483-
State3;
2481+
maybe_bump_reduce_memory_use(State3)
2482+
end;
24842483
%% When using lazy queues, there are no alphas, so we don't need to
24852484
%% call push_alphas_to_betas/2.
24862485
reduce_memory_use(State = #vqstate {
@@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate {
25062505
garbage_collect(),
25072506
State3.
25082507

2508+
maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
2509+
State;
2510+
maybe_bump_reduce_memory_use(State) ->
2511+
self() ! bump_reduce_memory_use,
2512+
State#vqstate{ waiting_bump = true }.
2513+
25092514
limit_ram_acks(0, State) ->
25102515
{0, ui(State)};
25112516
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,

test/channel_operation_timeout_test_queue.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4]).
30-
-export([start/2, stop/1]).
29+
start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).
3130

3231
%%----------------------------------------------------------------------------
3332
%% This test backing queue follows the variable queue implementation, with
@@ -91,7 +90,8 @@
9190
memory_reduction_run_count,
9291
%% Queue data is grouped by VHost. We need to store it
9392
%% to work with queue index.
94-
virtual_host
93+
virtual_host,
94+
waiting_bump = false
9595
}).
9696

9797
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -285,6 +285,9 @@ timeout(State) ->
285285
handle_pre_hibernate(State) ->
286286
rabbit_variable_queue:handle_pre_hibernate(State).
287287

288+
handle_info(Msg, State) ->
289+
rabbit_variable_queue:handle_info(Msg, State).
290+
288291
resume(State) -> rabbit_variable_queue:resume(State).
289292

290293
msg_rates(State) ->

0 commit comments

Comments
 (0)