Skip to content

Commit 38edc0b

Browse files
kjnilssonmergify-bot
authored andcommitted
QQ: emit release cursors after consumer cancel
If this is not done apps that consume/cancel from empty queues in a loop will grow the raft log in an unbounded manner. This could also be the case for the garbage_collect command. (cherry picked from commit eaa216d) (cherry picked from commit a787a92)
1 parent 2354527 commit 38edc0b

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,13 @@ apply(#{index := Index,
340340
{State, Reply, Effects}
341341
end
342342
end;
343-
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
344-
{State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
345-
consumer_cancel),
346-
checkout(Meta, State0, State, Effects);
343+
apply(#{index := Idx} = Meta,
344+
#checkout{spec = cancel,
345+
consumer_id = ConsumerId}, State0) ->
346+
{State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [],
347+
consumer_cancel),
348+
{State, Reply, Effects} = checkout(Meta, State0, State1, Effects1),
349+
update_smallest_raft_index(Idx, Reply, State, Effects);
347350
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
348351
consumer_id = {_, Pid} = ConsumerId},
349352
State0) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
372375
{State, _, Effects} = evaluate_limit(Index, false, State0,
373376
State1, Effects0),
374377
update_smallest_raft_index(Index, Reply, State, Effects);
375-
apply(_Meta, #garbage_collection{}, State) ->
376-
{State, ok, [{aux, garbage_collection}]};
378+
apply(#{index := Idx}, #garbage_collection{}, State) ->
379+
update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
377380
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
378381
#?MODULE{consumers = Cons0,
379382
cfg = #cfg{consumer_strategy = single_active},

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,11 @@ cancelled_checkout_empty_queue_test(_) ->
397397
{State1, _} = check_auto(Cid, 2, test_init(test)),
398398
% cancelled checkout should clear out service_queue also, else we'd get a
399399
% build up of these
400-
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
400+
{State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
401401
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
402402
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
403+
ct:pal("Effs: ~p", [Effects]),
404+
?ASSERT_EFF({release_cursor, _, _}, Effects),
403405
ok.
404406

405407
cancelled_checkout_out_test(_) ->

0 commit comments

Comments
 (0)