Skip to content

Commit 2256850

Browse files
committed
WIP
partial cherry pick from #11048
1 parent 746c06f commit 2256850

File tree

2 files changed

+135
-3
lines changed

2 files changed

+135
-3
lines changed

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ groups() ->
2828
dead_letter_nack_requeue,
2929
dead_letter_nack_requeue_multiple,
3030
dead_letter_reject,
31+
dead_letter_reject_expire_expire,
3132
dead_letter_reject_many,
3233
dead_letter_reject_requeue,
3334
dead_letter_max_length_drop_head,
@@ -185,7 +186,7 @@ init_per_testcase(Testcase, Config) ->
185186
{skip, "dead_letter_headers_should_not_be_appended_for_republish isn't mixed versions compatible"};
186187
_ ->
187188
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
188-
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~tp", [Group, Testcase])),
189+
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
189190
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
190191
Q3 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_3", [Group, Testcase])),
191192
Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])),
@@ -377,6 +378,64 @@ dead_letter_reject(Config) ->
377378
consume_empty(Ch, QName),
378379
?assertEqual(1, counted(messages_dead_lettered_rejected_total, Config)).
379380

381+
dead_letter_reject_expire_expire(Config) ->
382+
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
383+
%% In 3.13.0 - 3.13.2 there is a bug in mc:is_death_cycle/2 where the queue names matter.
384+
%% The following queue names triggered the bug because they affect the order returned by maps:keys/1.
385+
Q1 = <<"b">>,
386+
Q2 = <<"a2">>,
387+
Q3 = <<"a3">>,
388+
Args = ?config(queue_args, Config),
389+
Durable = ?config(queue_durable, Config),
390+
391+
%% Test the followig topology message flow:
392+
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
393+
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
394+
%% Q1
395+
396+
#'queue.declare_ok'{} = amqp_channel:call(
397+
Ch,
398+
#'queue.declare'{
399+
queue = Q1,
400+
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
401+
{<<"x-dead-letter-routing-key">>, longstr, Q2}],
402+
durable = Durable}),
403+
#'queue.declare_ok'{} = amqp_channel:call(
404+
Ch,
405+
#'queue.declare'{
406+
queue = Q2,
407+
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
408+
{<<"x-dead-letter-routing-key">>, longstr, Q3},
409+
{<<"x-message-ttl">>, long, 5}],
410+
durable = Durable}),
411+
#'queue.declare_ok'{} = amqp_channel:call(
412+
Ch,
413+
#'queue.declare'{
414+
queue = Q3,
415+
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
416+
{<<"x-dead-letter-routing-key">>, longstr, Q1},
417+
{<<"x-message-ttl">>, long, 5}],
418+
durable = Durable}),
419+
420+
%% Send a single message.
421+
P = <<"msg">>,
422+
publish(Ch, Q1, [P]),
423+
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]),
424+
425+
%% Reject the 1st time.
426+
[DTag1] = consume(Ch, Q1, [P]),
427+
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag1,
428+
requeue = false}),
429+
%% Message should now flow from Q1 -> Q2 -> Q3 -> Q1
430+
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]),
431+
432+
%% Reject the 2nd time.
433+
[DTag2] = consume(Ch, Q1, [P]),
434+
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2,
435+
requeue = false}),
436+
%% Message should again flow from Q1 -> Q2 -> Q3 -> Q1
437+
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]).
438+
380439
%% 1) Many messages are rejected. They get dead-lettered in correct order.
381440
dead_letter_reject_many(Config) ->
382441
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ all_tests() ->
3030
amqpl_table_x_header,
3131
amqpl_table_x_header_array_of_tbls,
3232
amqpl_death_records,
33+
is_death_cycle,
3334
amqpl_amqp_bin_amqpl,
3435
amqpl_cc_amqp_bin_amqpl,
3536
amqp_amqpl_amqp_uuid_correlation_id,
@@ -222,19 +223,91 @@ amqpl_death_records(_Config) ->
222223
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),
223224

224225

225-
%% second dead letter, e.g. a ttl reason returning to source queue
226+
%% second dead letter, e.g. an expired reason returning to source queue
226227

227228
%% record_death uses a timestamp for death record ordering, ensure
228229
%% it is definitely higher than the last timestamp taken
229230
timer:sleep(2),
230-
Msg2 = mc:record_death(ttl, <<"dl">>, Msg1),
231+
Msg2 = mc:record_death(expired, <<"dl">>, Msg1),
231232

232233
#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
233234
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
234235
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
235236
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
236237
ok.
237238

239+
is_death_cycle(_Config) ->
240+
Content = #content{class_id = 60,
241+
properties = #'P_basic'{headers = []},
242+
payload_fragments_rev = [<<"data">>]},
243+
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),
244+
245+
%% Test the followig topology:
246+
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
247+
%% Q1 --rejected--> Q2 --expired--> Q3
248+
249+
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
250+
?assertNot(mc:is_death_cycle(<<"q1">>, Msg1),
251+
"A queue that dead letters to itself due to rejected is not considered a cycle."),
252+
?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)),
253+
?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)),
254+
255+
timer:sleep(3),
256+
Msg2 = mc:record_death(expired, <<"q2">>, Msg1),
257+
?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)),
258+
?assert(mc:is_death_cycle(<<"q2">>, Msg2),
259+
"A queue that dead letters to itself due to expired is considered a cycle."),
260+
?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)),
261+
262+
timer:sleep(3),
263+
Msg3 = mc:record_death(expired, <<"q3">>, Msg2),
264+
?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)),
265+
?assert(mc:is_death_cycle(<<"q2">>, Msg3)),
266+
?assert(mc:is_death_cycle(<<"q3">>, Msg3)),
267+
268+
timer:sleep(3),
269+
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3),
270+
?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)),
271+
?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)),
272+
?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)),
273+
274+
timer:sleep(3),
275+
Msg5 = mc:record_death(expired, <<"q2">>, Msg4),
276+
?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)),
277+
?assert(mc:is_death_cycle(<<"q2">>, Msg5)),
278+
?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),
279+
280+
?assertEqual([<<"q1">>, <<"q2">>, <<"q3">>],
281+
lists:sort(mc:death_queue_names(Msg5))),
282+
?assertMatch({{<<"q2">>, expired},
283+
#death{exchange = <<"exch">>,
284+
routing_keys = [<<"apple">>],
285+
count = 2,
286+
anns = #{first_time := FirstTime,
287+
last_time := LastTime}}}
288+
when FirstTime < LastTime, mc:last_death(Msg5)),
289+
290+
#content{properties = #'P_basic'{headers = H}} = mc:protocol_state(Msg5),
291+
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H)),
292+
?assertMatch({_, longstr, <<"q2">>}, header(<<"x-last-death-queue">>, H)),
293+
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H)),
294+
?assertMatch({_, longstr, <<"expired">>}, header(<<"x-last-death-reason">>, H)),
295+
296+
%% We expect the array to be ordered by recency.
297+
{_, array, [{table, T1}, {table, T2}, {table, T3}]} = header(<<"x-death">>, H),
298+
299+
?assertMatch({_, longstr, <<"q2">>}, header(<<"queue">>, T1)),
300+
?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T1)),
301+
?assertMatch({_, long, 2}, header(<<"count">>, T1)),
302+
303+
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2)),
304+
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T2)),
305+
?assertMatch({_, long, 2}, header(<<"count">>, T2)),
306+
307+
?assertMatch({_, longstr, <<"q3">>}, header(<<"queue">>, T3)),
308+
?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T3)),
309+
?assertMatch({_, long, 1}, header(<<"count">>, T3)).
310+
238311
header(K, H) ->
239312
rabbit_basic:header(K, H).
240313

0 commit comments

Comments
 (0)