Skip to content

Commit 46a0f56

Browse files
ansdmergify[bot]
authored andcommitted
Add custom dead letter history test
Test the use case described in rabbitmq/rabbitmq-website#2095: > Rather than relying solely on RabbitMQ's built-in dead lettering tracking via x-opt-deaths, consumers can customise dead lettering event tracking. (cherry picked from commit 2e90619)
1 parent 22ac040 commit 46a0f56

File tree

1 file changed

+82
-1
lines changed

1 file changed

+82
-1
lines changed

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ groups() ->
118118
modified_classic_queue,
119119
modified_quorum_queue,
120120
modified_dead_letter_headers_exchange,
121+
modified_dead_letter_history,
121122
dead_letter_headers_exchange,
122123
dead_letter_reject,
123124
dead_letter_reject_message_order_classic_queue,
@@ -253,7 +254,8 @@ init_per_testcase(T, Config)
253254
end;
254255
init_per_testcase(T, Config)
255256
when T =:= modified_quorum_queue orelse
256-
T =:= modified_dead_letter_headers_exchange ->
257+
T =:= modified_dead_letter_headers_exchange orelse
258+
T =:= modified_dead_letter_history ->
257259
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
258260
true ->
259261
rabbit_ct_helpers:testcase_started(Config, T);
@@ -716,6 +718,85 @@ modified_dead_letter_headers_exchange(Config) ->
716718
ok = end_session_sync(Session),
717719
ok = amqp10_client:close_connection(Connection).
718720

721+
%% Test that custom dead lettering event tracking works as described in
722+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
723+
modified_dead_letter_history(Config) ->
724+
{Connection, Session, LinkPair} = init(Config),
725+
Q1 = <<"qq 1">>,
726+
Q2 = <<"qq 2">>,
727+
728+
{ok, _} = rabbitmq_amqp_client:declare_queue(
729+
LinkPair, Q1,
730+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
731+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
732+
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
733+
{ok, _} = rabbitmq_amqp_client:declare_queue(
734+
LinkPair, Q2,
735+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
736+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
737+
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
738+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),
739+
740+
{ok, Sender} = amqp10_client:attach_sender_link(
741+
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
742+
wait_for_credit(Sender),
743+
{ok, Receiver1} = amqp10_client:attach_receiver_link(
744+
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
745+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
746+
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),
747+
748+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
749+
ok = wait_for_accepts(1),
750+
ok = detach_link_sync(Sender),
751+
752+
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
753+
?assertMatch(#{delivery_count := 0,
754+
first_acquirer := true},
755+
amqp10_msg:headers(Msg1)),
756+
ok = amqp10_client:settle_msg(
757+
Receiver1, Msg1,
758+
{modified, true, true,
759+
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
760+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
761+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
762+
}),
763+
764+
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
765+
?assertMatch(#{delivery_count := 1,
766+
first_acquirer := false},
767+
amqp10_msg:headers(Msg2)),
768+
#{<<"x-opt-history-list">> := L1,
769+
<<"x-opt-history-map">> := L2,
770+
<<"x-opt-history-array">> := {array, utf8, L0}
771+
} = amqp10_msg:message_annotations(Msg2),
772+
ok = amqp10_client:settle_msg(
773+
Receiver2, Msg2,
774+
{modified, true, true,
775+
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
776+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
777+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
778+
<<"x-other">> => 99}}),
779+
780+
{ok, Msg3} = amqp10_client:get_msg(Receiver1),
781+
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
782+
?assertMatch(#{delivery_count := 2,
783+
first_acquirer := false},
784+
amqp10_msg:headers(Msg3)),
785+
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
786+
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
787+
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
788+
{{symbol, <<"k1">>}, {byte, -1}}],
789+
<<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)),
790+
ok = amqp10_client:accept_msg(Receiver1, Msg3),
791+
792+
ok = detach_link_sync(Receiver1),
793+
ok = detach_link_sync(Receiver2),
794+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
795+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
796+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
797+
ok = end_session_sync(Session),
798+
ok = amqp10_client:close_connection(Connection).
799+
719800
%% Tests that confirmations are returned correctly
720801
%% when sending many messages async to a quorum queue.
721802
sender_settle_mode_unsettled(Config) ->

0 commit comments

Comments
 (0)