Skip to content

Commit 655934d

Browse files
authored
Merge pull request #1809 from rabbitmq/qq-purge-bug-fix
Quorum queue: purge should not remove checkout out messages.
2 parents 7e34788 + 16728ec commit 655934d

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

src/rabbit_fifo.erl

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -413,26 +413,20 @@ apply(_, #checkout{spec = Spec, meta = Meta,
413413
State1 = update_consumer(ConsumerId, Meta, Spec, State0),
414414
checkout(State1, [{monitor, process, Pid}]);
415415
apply(#{index := RaftIdx}, #purge{},
416-
#state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
417-
Total = rabbit_fifo_index:size(Indexes),
418-
{State1, Effects1} =
419-
maps:fold(
420-
fun(ConsumerId, C = #consumer{checked_out = Checked0},
421-
{StateAcc0, EffectsAcc0}) ->
422-
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
423-
<- maps:values(Checked0)],
424-
complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
425-
#{}, EffectsAcc0, StateAcc0)
426-
end, {State0, []}, Cons0),
427-
{State, _, Effects} =
428-
update_smallest_raft_index(
429-
RaftIdx, Indexes,
430-
State1#state{ra_indexes = rabbit_fifo_index:empty(),
431-
messages = #{},
432-
returns = lqueue:new(),
433-
msg_bytes_enqueue = 0,
434-
msg_bytes_checkout = 0,
435-
low_msg_num = undefined}, Effects1),
416+
#state{ra_indexes = Indexes0,
417+
messages = Messages} = State0) ->
418+
Total = maps:size(Messages),
419+
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
420+
Indexes0,
421+
[I || {I, _} <- lists:sort(maps:values(Messages))]),
422+
{State, _, Effects} =
423+
update_smallest_raft_index(RaftIdx, Indexes0,
424+
State0#state{ra_indexes = Indexes,
425+
messages = #{},
426+
returns = lqueue:new(),
427+
msg_bytes_enqueue = 0,
428+
low_msg_num = undefined},
429+
[]),
436430
%% as we're not checking out after a purge (no point) we have to
437431
%% reverse the effects ourselves
438432
{State, {purge, Total},
@@ -1876,11 +1870,12 @@ purge_with_checkout_test() ->
18761870
%% assert message bytes are non zero
18771871
?assert(State2#state.msg_bytes_checkout > 0),
18781872
?assert(State2#state.msg_bytes_enqueue > 0),
1879-
{State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2),
1880-
?assertEqual(0, State3#state.msg_bytes_checkout),
1873+
{State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2),
1874+
?assert(State2#state.msg_bytes_checkout > 0),
18811875
?assertEqual(0, State3#state.msg_bytes_enqueue),
1876+
?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)),
18821877
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
1883-
?assertEqual(0, maps:size(Checked)),
1878+
?assertEqual(1, maps:size(Checked)),
18841879
ok.
18851880

18861881
down_returns_checked_out_in_order_test() ->

src/rabbit_fifo_index.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State)
5656
when is_integer(Key) ->
5757
State#?MODULE{data = maps:put(Key, Value, Data)}.
5858

59-
-spec delete(integer(), state()) -> state().
59+
-spec delete(Index :: integer(), state()) -> state().
6060
delete(Smallest, #?MODULE{data = Data0,
61-
largest = Largest,
62-
smallest = Smallest} = State) ->
61+
largest = Largest,
62+
smallest = Smallest} = State) ->
6363
Data = maps:remove(Smallest, Data0),
6464
case find_next(Smallest + 1, Largest, Data) of
6565
undefined ->

test/quorum_queue_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,8 +1539,8 @@ purge(Config) ->
15391539
_DeliveryTag = consume(Ch, QQ, false),
15401540
wait_for_messages_ready(Servers, RaName, 1),
15411541
wait_for_messages_pending_ack(Servers, RaName, 1),
1542-
{'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
1543-
wait_for_messages_pending_ack(Servers, RaName, 0),
1542+
{'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
1543+
wait_for_messages_pending_ack(Servers, RaName, 1),
15441544
wait_for_messages_ready(Servers, RaName, 0).
15451545

15461546
sync_queue(Config) ->

0 commit comments

Comments
 (0)