Skip to content

Commit dd20ef7

Browse files
committed
Use new handle_info rabbit_backing_queue callback
1 parent 97352a7 commit dd20ef7

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,8 +1455,9 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
14551455
credit_flow:handle_bump_msg(Msg),
14561456
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
14571457
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
1458-
backing_queue_state = BQS}) ->
1459-
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
1458+
backing_queue_state = BQS0}) ->
1459+
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
1460+
noreply(State#q{backing_queue_state = BQ:resume(BQS1)});
14601461

14611462
handle_info(Info, State) ->
14621463
{stop, {unhandled_info, Info}, State}.

src/rabbit_mirror_queue_master.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
2525
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
27-
zip_msgs_and_acks/4]).
27+
zip_msgs_and_acks/4, handle_info/2]).
2828

2929
-export([start/2, stop/1, delete_crashed/1]).
3030

@@ -447,6 +447,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
447447
backing_queue_state = BQS }) ->
448448
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
449449

450+
handle_info(Msg, State = #state { backing_queue = BQ,
451+
backing_queue_state = BQS }) ->
452+
State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }.
453+
450454
resume(State = #state { backing_queue = BQ,
451455
backing_queue_state = BQS }) ->
452456
State #state { backing_queue_state = BQ:resume(BQS) }.

src/rabbit_priority_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
4242
handle_pre_hibernate/1, resume/1, msg_rates/1,
4343
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
44-
zip_msgs_and_acks/4]).
44+
zip_msgs_and_acks/4, handle_info/2]).
4545

4646
-record(state, {bq, bqss, max_priority}).
4747
-record(passthrough, {bq, bqs}).
@@ -393,6 +393,9 @@ handle_pre_hibernate(State = #state{bq = BQ}) ->
393393
handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) ->
394394
?passthrough1(handle_pre_hibernate(BQS)).
395395

396+
handle_info(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
397+
?passthrough1(handle_info(Msg, BQS)).
398+
396399
resume(State = #state{bq = BQ}) ->
397400
foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State);
398401
resume(State = #passthrough{bq = BQ, bqs = BQS}) ->

src/rabbit_variable_queue.erl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4, multiple_routing_keys/0]).
29+
zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).
3030

3131
-export([start/2, stop/1]).
3232

@@ -912,10 +912,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
912912
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
913913
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
914914

915-
resume(State = #vqstate{waiting_bump = true}) ->
916-
resume(State#vqstate{waiting_bump = false});
917-
resume(State) ->
918-
a(reduce_memory_use(State)).
915+
handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
916+
State#vqstate{ waiting_bump = false }.
917+
918+
resume(State) -> a(reduce_memory_use(State)).
919919

920920
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
921921
out = AvgEgressRate } }) ->
@@ -2505,11 +2505,11 @@ reduce_memory_use(State = #vqstate {
25052505
garbage_collect(),
25062506
State3.
25072507

2508-
maybe_bump_reduce_memory_use(State = #vqstate{waiting_bump = true}) ->
2508+
maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
25092509
State;
25102510
maybe_bump_reduce_memory_use(State) ->
25112511
self() ! bump_reduce_memory_use,
2512-
State#vqstate{waiting_bump = true}.
2512+
State#vqstate{ waiting_bump = true }.
25132513

25142514
limit_ram_acks(0, State) ->
25152515
{0, ui(State)};

test/channel_operation_timeout_test_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
start/2, stop/1, zip_msgs_and_acks/4]).
29+
start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).
3030

3131
%%----------------------------------------------------------------------------
3232
%% This test backing queue follows the variable queue implementation, with
@@ -285,6 +285,9 @@ timeout(State) ->
285285
handle_pre_hibernate(State) ->
286286
rabbit_variable_queue:handle_pre_hibernate(State).
287287

288+
handle_info(Msg, State) ->
289+
rabbit_variable_queue:handle_info(Msg, State).
290+
288291
resume(State) -> rabbit_variable_queue:resume(State).
289292

290293
msg_rates(State) ->

0 commit comments

Comments
 (0)