Skip to content

Commit 1cc662c

Browse files
Merge pull request #2256 from rabbitmq/rabbit-fifo-dead-letter-bug
Fix QQ crash recovery bug
2 parents e1795a4 + ac86a6c commit 1cc662c

File tree

3 files changed

+37
-11
lines changed

3 files changed

+37
-11
lines changed

src/rabbit_fifo.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ update_config(Conf, State) ->
146146
SHICur = case State#?MODULE.cfg of
147147
#cfg{release_cursor_interval = {_, C}} ->
148148
C;
149-
#cfg{release_cursor_interval = C} ->
149+
#cfg{release_cursor_interval = undefined} ->
150+
SHI;
151+
#cfg{release_cursor_interval = C} ->
150152
C
151153
end,
152154

@@ -1086,8 +1088,9 @@ snd(T) ->
10861088
return(Meta, ConsumerId, Returned,
10871089
Effects0, #?MODULE{service_queue = SQ0} = State0) ->
10881090
{State1, Effects1} = maps:fold(
1089-
fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg';
1090-
Tag == '$empty_msg'->
1091+
fun(MsgId, {Tag, _} = Msg, {S0, E0})
1092+
when Tag == '$prefix_msg';
1093+
Tag == '$empty_msg'->
10911094
return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
10921095
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
10931096
return_one(MsgId, MsgNum, Msg, S0, E0,
@@ -1158,7 +1161,9 @@ dead_letter_effects(Reason, Discarded,
11581161
#?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
11591162
Effects) ->
11601163
DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
1161-
[{Reason, Msg} | Acc]
1164+
[{Reason, Msg} | Acc];
1165+
(_, _, Acc) ->
1166+
Acc
11621167
end, [], Discarded),
11631168
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
11641169

src/rabbit_fifo.hrl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@
104104
-record(cfg,
105105
{name :: atom(),
106106
resource :: rabbit_types:r('queue'),
107-
release_cursor_interval =
108-
{?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} ::
109-
non_neg_integer() | {non_neg_integer(), non_neg_integer()},
107+
release_cursor_interval ::
108+
undefined | non_neg_integer() |
109+
{non_neg_integer(), non_neg_integer()},
110110
dead_letter_handler :: option(applied_mfa()),
111111
become_leader_handler :: option(applied_mfa()),
112112
max_length :: option(non_neg_integer()),

test/rabbit_fifo_prop_SUITE.erl

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ all_tests() ->
4444
scenario18,
4545
scenario19,
4646
scenario20,
47+
scenario21,
4748
single_active,
4849
single_active_01,
4950
single_active_02,
@@ -356,6 +357,24 @@ scenario20(_Config) ->
356357
max_in_memory_length => 1}, Commands),
357358
ok.
358359

360+
scenario21(_Config) ->
361+
C1Pid = c:pid(0,883,1),
362+
C1 = {<<>>, C1Pid},
363+
E = c:pid(0,176,1),
364+
Commands = [
365+
make_checkout(C1, {auto,2,simple_prefetch}),
366+
make_enqueue(E,1,<<"1">>),
367+
make_enqueue(E,2,<<"2">>),
368+
make_enqueue(E,3,<<"3">>),
369+
rabbit_fifo:make_discard(C1, [0]),
370+
rabbit_fifo:make_settle(C1, [1])
371+
],
372+
run_snapshot_test(#{name => ?FUNCTION_NAME,
373+
release_cursor_interval => 1,
374+
dead_letter_handler => {?MODULE, banana, []}},
375+
Commands),
376+
ok.
377+
359378
single_active_01(_Config) ->
360379
C1Pid = test_util:fake_pid(rabbit@fake_node1),
361380
C1 = {<<0>>, C1Pid},
@@ -450,7 +469,7 @@ snapshots(_Config) ->
450469
oneof([range(1, 10), undefined]),
451470
oneof([range(1, 1000), undefined])
452471
}}]),
453-
?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
472+
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops)),
454473
collect({log_size, length(O)},
455474
snapshots_prop(
456475
config(?FUNCTION_NAME,
@@ -607,10 +626,12 @@ in_memory_limit(_Config) ->
607626
InMemoryBytes), O))))
608627
end, [], Size).
609628

610-
config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
629+
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
630+
InMemoryLength, InMemoryBytes) ->
611631
#{name => Name,
612632
max_length => map_max(Length),
613633
max_bytes => map_max(Bytes),
634+
dead_letter_handler => {?MODULE, banana, []},
614635
single_active_consumer_on => SingleActive,
615636
delivery_limit => map_max(DeliveryLimit),
616637
max_in_memory_length => map_max(InMemoryLength),
@@ -741,8 +762,8 @@ log_gen(Size, _Body) ->
741762
{40, {input_event,
742763
frequency([{10, settle},
743764
{2, return},
744-
{1, discard},
745-
{1, requeue}])}},
765+
{2, discard},
766+
{2, requeue}])}},
746767
{2, checkout_gen(oneof(CPids))},
747768
{1, checkout_cancel_gen(oneof(CPids))},
748769
{1, down_gen(oneof(EPids ++ CPids))},

0 commit comments

Comments
 (0)