Skip to content

Commit e662841

Browse files
dcorbachokjnilsson
authored andcommitted
Remove poison message from indexes
[#163513253]
1 parent 6980462 commit e662841

File tree

2 files changed

+45
-38
lines changed

2 files changed

+45
-38
lines changed

src/rabbit_fifo.erl

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ apply(_, {down, ConsumerPid, noconnection},
472472
#consumer{checked_out = Checked0} = C,
473473
{Co, St0, Eff}) when (node(P) =:= Node) and
474474
(C#consumer.status =/= cancelled)->
475-
{St, Eff0} = return_all(St0, Checked0, Eff),
475+
{St, Eff0} = return_all(St0, Checked0, Eff, K, C),
476476
Credit = increase_credit(C, maps:size(Checked0)),
477477
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
478478
suspected_down, Eff0),
@@ -965,7 +965,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
965965
C0, SQ0, Effects0),
966966
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
967967
down ->
968-
{S1, Effects1} = return_all(S0, Checked0, Effects0),
968+
{S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer),
969969
{S1#state{consumers = Cons1}, Effects1}
970970
end.
971971

@@ -1089,9 +1089,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
10891089
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
10901090
SQ0, Effects0),
10911091
{State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
1092-
return_one(0, Msg, S0, E0);
1092+
return_one(0, Msg, S0, E0, ConsumerId, Con);
10931093
({MsgNum, Msg}, {S0, E0}) ->
1094-
return_one(MsgNum, Msg, S0, E0)
1094+
return_one(MsgNum, Msg, S0, E0, ConsumerId, Con)
10951095
end, {State0, Effects1}, MsgNumMsgs),
10961096
checkout(Meta, State1#state{consumers = Cons,
10971097
service_queue = SQ},
@@ -1201,34 +1201,36 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
12011201
end.
12021202

12031203
return_one(0, {'$prefix_msg', _} = Msg,
1204-
#state{returns = Returns} = State0, Effects) ->
1204+
#state{returns = Returns} = State0, Effects, _ConsumerId, _Con) ->
12051205
{add_bytes_return(Msg,
12061206
State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
12071207
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
12081208
#state{returns = Returns,
1209-
delivery_limit = DeliveryLimit} = State0, Effects0) ->
1209+
delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) ->
12101210
Header = maps:update_with(delivery_count,
12111211
fun (C) -> C+1 end,
12121212
1, Header0),
12131213
case maps:get(delivery_count, Header) of
12141214
DeliveryCount when DeliveryCount > DeliveryLimit ->
12151215
Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
1216-
{add_bytes_settle(RawMsg, State0), Effects};
1216+
Checked = maps:without([MsgNum], Con#consumer.checked_out),
1217+
{State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0),
1218+
{add_bytes_settle(RawMsg, State1), Effects1};
12171219
_ ->
12181220
Msg = {RaftId, {Header, RawMsg}},
12191221
%% this should not affect the release cursor in any way
12201222
{add_bytes_return(RawMsg,
12211223
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
12221224
end.
12231225

1224-
return_all(State0, Checked0, Effects0) ->
1226+
return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
12251227
%% need to sort the list so that we return messages in the order
12261228
%% they were checked out
12271229
Checked = lists:sort(maps:to_list(Checked0)),
12281230
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
1229-
return_one(0, Msg, S, E);
1231+
return_one(0, Msg, S, E, ConsumerId, Consumer);
12301232
({_, {MsgNum, Msg}}, {S, E}) ->
1231-
return_one(MsgNum, Msg, S, E)
1233+
return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
12321234
end, {State0, Effects0}, Checked).
12331235

12341236
%% checkout new messages to consumers
@@ -1868,6 +1870,26 @@ return_checked_out_test() ->
18681870
apply(meta(3), make_return(Cid, [MsgId]), State1),
18691871
ok.
18701872

1873+
return_checked_out_limit_test() ->
1874+
Cid = {<<"cid">>, self()},
1875+
Init = init(#{name => test,
1876+
queue_resource => rabbit_misc:r("/", queue,
1877+
atom_to_binary(test, utf8)),
1878+
release_cursor_interval => 0,
1879+
delivery_limit => 1}),
1880+
{State0, [_, _]} = enq(1, 1, first, Init),
1881+
{State1, [_Monitor,
1882+
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
1883+
{aux, active} | _ ]} = check_auto(Cid, 2, State0),
1884+
% returning immediately checks out the same message again
1885+
{State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
1886+
{aux, active}]} =
1887+
apply(meta(3), make_return(Cid, [MsgId]), State1),
1888+
{#state{ra_indexes = RaIdxs}, ok, []} =
1889+
apply(meta(4), make_return(Cid, [MsgId2]), State2),
1890+
?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
1891+
ok.
1892+
18711893
return_auto_checked_out_test() ->
18721894
Cid = {<<"cid">>, self()},
18731895
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -1886,7 +1908,6 @@ return_auto_checked_out_test() ->
18861908
Effects),
18871909
ok.
18881910

1889-
18901911
cancelled_checkout_out_test() ->
18911912
Cid = {<<"cid">>, self()},
18921913
{State00, [_, _]} = enq(1, 1, first, test_init(test)),

test/quorum_queue_SUITE.erl

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,18 +1523,16 @@ subscribe_redelivery_count(Config) ->
15231523
end.
15241524

15251525
subscribe_redelivery_limit(Config) ->
1526-
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1526+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
15271527

15281528
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
15291529
QQ = ?config(queue_name, Config),
15301530
?assertEqual({'queue.declare_ok', QQ, 0, 0},
15311531
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
15321532
{<<"x-delivery-limit">>, long, 1}])),
15331533

1534-
RaName = ra_name(QQ),
15351534
publish(Ch, QQ),
1536-
wait_for_messages_ready(Servers, RaName, 1),
1537-
wait_for_messages_pending_ack(Servers, RaName, 0),
1535+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
15381536
subscribe(Ch, QQ, false),
15391537

15401538
DTag = <<"x-delivery-count">>,
@@ -1548,8 +1546,7 @@ subscribe_redelivery_limit(Config) ->
15481546
requeue = true})
15491547
end,
15501548

1551-
wait_for_messages_ready(Servers, RaName, 0),
1552-
wait_for_messages_pending_ack(Servers, RaName, 1),
1549+
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
15531550
receive
15541551
{#'basic.deliver'{delivery_tag = DeliveryTag1,
15551552
redelivered = true},
@@ -1560,8 +1557,7 @@ subscribe_redelivery_limit(Config) ->
15601557
requeue = true})
15611558
end,
15621559

1563-
wait_for_messages_ready(Servers, RaName, 0),
1564-
wait_for_messages_pending_ack(Servers, RaName, 0),
1560+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
15651561
receive
15661562
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
15671563
throw(unexpected_redelivery)
@@ -1570,7 +1566,7 @@ subscribe_redelivery_limit(Config) ->
15701566
end.
15711567

15721568
subscribe_redelivery_policy(Config) ->
1573-
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1569+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
15741570

15751571
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
15761572
QQ = ?config(queue_name, Config),
@@ -1581,10 +1577,8 @@ subscribe_redelivery_policy(Config) ->
15811577
Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
15821578
[{<<"delivery-limit">>, 1}]),
15831579

1584-
RaName = ra_name(QQ),
15851580
publish(Ch, QQ),
1586-
wait_for_messages_ready(Servers, RaName, 1),
1587-
wait_for_messages_pending_ack(Servers, RaName, 0),
1581+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
15881582
subscribe(Ch, QQ, false),
15891583

15901584
DTag = <<"x-delivery-count">>,
@@ -1598,8 +1592,7 @@ subscribe_redelivery_policy(Config) ->
15981592
requeue = true})
15991593
end,
16001594

1601-
wait_for_messages_ready(Servers, RaName, 0),
1602-
wait_for_messages_pending_ack(Servers, RaName, 1),
1595+
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
16031596
receive
16041597
{#'basic.deliver'{delivery_tag = DeliveryTag1,
16051598
redelivered = true},
@@ -1610,8 +1603,7 @@ subscribe_redelivery_policy(Config) ->
16101603
requeue = true})
16111604
end,
16121605

1613-
wait_for_messages_ready(Servers, RaName, 0),
1614-
wait_for_messages_pending_ack(Servers, RaName, 0),
1606+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
16151607
receive
16161608
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
16171609
throw(unexpected_redelivery)
@@ -1621,7 +1613,7 @@ subscribe_redelivery_policy(Config) ->
16211613
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
16221614

16231615
subscribe_redelivery_limit_with_dead_letter(Config) ->
1624-
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1616+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
16251617

16261618
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
16271619
QQ = ?config(queue_name, Config),
@@ -1635,11 +1627,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
16351627
?assertEqual({'queue.declare_ok', DLX, 0, 0},
16361628
declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
16371629

1638-
RaName = ra_name(QQ),
1639-
RaDlxName = ra_name(DLX),
16401630
publish(Ch, QQ),
1641-
wait_for_messages_ready(Servers, RaName, 1),
1642-
wait_for_messages_pending_ack(Servers, RaName, 0),
1631+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
16431632
subscribe(Ch, QQ, false),
16441633

16451634
DTag = <<"x-delivery-count">>,
@@ -1653,8 +1642,7 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
16531642
requeue = true})
16541643
end,
16551644

1656-
wait_for_messages_ready(Servers, RaName, 0),
1657-
wait_for_messages_pending_ack(Servers, RaName, 1),
1645+
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
16581646
receive
16591647
{#'basic.deliver'{delivery_tag = DeliveryTag1,
16601648
redelivered = true},
@@ -1665,10 +1653,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
16651653
requeue = true})
16661654
end,
16671655

1668-
wait_for_messages_ready(Servers, RaName, 0),
1669-
wait_for_messages_pending_ack(Servers, RaName, 0),
1670-
wait_for_messages_ready(Servers, RaDlxName, 1),
1671-
wait_for_messages_pending_ack(Servers, RaDlxName, 0).
1656+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
1657+
wait_for_messages(Config, [[DLX, <<"1">>, <<"1">>, <<"0">>]]).
16721658

16731659
consume_redelivery_count(Config) ->
16741660
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)