Skip to content

Commit b84f09c

Browse files
committed
Use rabbit_queue_behaviour and callback
Alternative for #1388 that does not use process dictionary. Requires rabbitmq/rabbitmq-common#228 Fix waiting_bump values
1 parent 27bf1d6 commit b84f09c

File tree

3 files changed

+20
-16
lines changed

3 files changed

+20
-16
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1456,7 +1456,6 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
14561456
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
14571457
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
14581458
backing_queue_state = BQS}) ->
1459-
put(waiting_bump, false),
14601459
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
14611460

14621461
handle_info(Info, State) ->

src/rabbit_variable_queue.erl

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@
325325
memory_reduction_run_count,
326326
%% Queue data is grouped by VHost. We need to store it
327327
%% to work with queue index.
328-
virtual_host
328+
virtual_host,
329+
waiting_bump = false
329330
}).
330331

331332
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -911,7 +912,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
911912
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
912913
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
913914

914-
resume(State) -> a(reduce_memory_use(State)).
915+
resume(State = #vqstate{waiting_bump = true}) ->
916+
resume(State#vqstate{waiting_bump = false});
917+
resume(State) ->
918+
a(reduce_memory_use(State)).
915919

916920
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
917921
out = AvgEgressRate } }) ->
@@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate {
24662470
Blocked = credit_flow:blocked(),
24672471
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
24682472
%% Credit bump will continue paging
2469-
{true, _} -> ok;
2473+
{true, _} -> State3;
24702474
%% Finished with paging
2471-
{false, false} -> ok;
2475+
{false, false} -> State3;
24722476
%% Planning next batch
24732477
{false, true} ->
24742478
%% We don't want to use self-credit-flow, because it's harder to
24752479
%% reason about. So the process sends a (prioritised) message to
24762480
%% itself and sets a waiting_bump value to keep the message box clean
2477-
case get(waiting_bump) of
2478-
true -> ok;
2479-
_ -> self() ! bump_reduce_memory_use,
2480-
put(waiting_bump, true)
2481-
end
2482-
end,
2483-
State3;
2481+
maybe_bump_reduce_memory_use(State3)
2482+
end;
24842483
%% When using lazy queues, there are no alphas, so we don't need to
24852484
%% call push_alphas_to_betas/2.
24862485
reduce_memory_use(State = #vqstate {
@@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate {
25062505
garbage_collect(),
25072506
State3.
25082507

2508+
maybe_bump_reduce_memory_use(State = #vqstate{waiting_bump = true}) ->
2509+
State;
2510+
maybe_bump_reduce_memory_use(State) ->
2511+
self() ! bump_reduce_memory_use,
2512+
State#vqstate{waiting_bump = true}.
2513+
25092514
limit_ram_acks(0, State) ->
25102515
{0, ui(State)};
25112516
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,

test/channel_operation_timeout_test_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
2727
handle_pre_hibernate/1, resume/1, msg_rates/1,
2828
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
29-
zip_msgs_and_acks/4]).
30-
-export([start/2, stop/1]).
29+
start/2, stop/1, zip_msgs_and_acks/4]).
3130

3231
%%----------------------------------------------------------------------------
3332
%% This test backing queue follows the variable queue implementation, with
@@ -91,7 +90,8 @@
9190
memory_reduction_run_count,
9291
%% Queue data is grouped by VHost. We need to store it
9392
%% to work with queue index.
94-
virtual_host
93+
virtual_host,
94+
waiting_bump = false
9595
}).
9696

9797
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -261,7 +261,7 @@ ackfold(MsgFun, Acc, State, AckTags) ->
261261
fold(Fun, Acc, State) ->
262262
rabbit_variable_queue:fold(Fun, Acc, State).
263263

264-
len(#vqstate { qi_pending_ack = QPA } = State) ->
264+
len(#vqstate{ qi_pending_ack = QPA } = State) ->
265265
maybe_delay(QPA),
266266
rabbit_variable_queue:len(State).
267267

0 commit comments

Comments
 (0)