Skip to content

Commit fdc6bd1

Browse files
authored
Merge pull request #12506 from rabbitmq/test-requeue-history
Track requeue history
2 parents d9ff6a0 + b1064fd commit fdc6bd1

File tree

6 files changed

+202
-44
lines changed

6 files changed

+202
-44
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,13 +1179,20 @@ wrap_map_value(true) ->
11791179
wrap_map_value(false) ->
11801180
{boolean, false};
11811181
wrap_map_value(V) when is_integer(V) ->
1182-
{uint, V};
1182+
case V < 0 of
1183+
true ->
1184+
{int, V};
1185+
false ->
1186+
uint(V)
1187+
end;
11831188
wrap_map_value(V) when is_binary(V) ->
11841189
utf8(V);
11851190
wrap_map_value(V) when is_list(V) ->
11861191
utf8(list_to_binary(V));
11871192
wrap_map_value(V) when is_atom(V) ->
1188-
utf8(atom_to_list(V)).
1193+
utf8(atom_to_list(V));
1194+
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
1195+
TaggedValue.
11891196

11901197
utf8(V) -> amqp10_client_types:utf8(V).
11911198

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
-type str() :: atom() | string() | binary().
4545
-type internal_ann_key() :: atom().
4646
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
47-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
47+
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
4848
-type protocol() :: module().
4949
-type annotations() :: #{internal_ann_key() => term(),
5050
x_ann_key() => x_ann_value()}.

deps/rabbit/src/mc_util.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
5252
{long, V};
5353
infer_type(V) when is_boolean(V) ->
5454
{boolean, V};
55-
infer_type({T, _} = V) when is_atom(T) ->
56-
%% looks like a pre-tagged type
57-
V.
55+
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
56+
TaggedValue.
5857

5958
utf8_string_is_ascii(UTF8String) ->
6059
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1938,7 +1938,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
19381938
Anns1 = lists:map(
19391939
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
19401940
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
1941-
{K, unwrap(V)}
1941+
{K, unwrap_simple_type(V)}
19421942
end, KVList),
19431943
maps:from_list(Anns1)
19441944
end,
@@ -3624,7 +3624,14 @@ format_status(
36243624
topic_permission_cache => TopicPermissionCache},
36253625
maps:update(state, State, Status).
36263626

3627-
unwrap({_Tag, V}) ->
3627+
3628+
unwrap_simple_type(V = {list, _}) ->
3629+
V;
3630+
unwrap_simple_type(V = {map, _}) ->
3631+
V;
3632+
unwrap_simple_type(V = {array, _, _}) ->
3633+
V;
3634+
unwrap_simple_type({_SimpleType, V}) ->
36283635
V;
3629-
unwrap(V) ->
3636+
unwrap_simple_type(V) ->
36303637
V.

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 178 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ groups() ->
129129
modified_classic_queue,
130130
modified_quorum_queue,
131131
modified_dead_letter_headers_exchange,
132+
modified_dead_letter_history,
132133
dead_letter_headers_exchange,
133134
dead_letter_reject,
134135
dead_letter_reject_message_order_classic_queue,
@@ -264,7 +265,8 @@ init_per_testcase(T, Config)
264265
end;
265266
init_per_testcase(T, Config)
266267
when T =:= modified_quorum_queue orelse
267-
T =:= modified_dead_letter_headers_exchange ->
268+
T =:= modified_dead_letter_headers_exchange orelse
269+
T =:= modified_dead_letter_history ->
268270
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
269271
true ->
270272
rabbit_ct_helpers:testcase_started(Config, T);
@@ -501,79 +503,127 @@ modified_quorum_queue(Config) ->
501503
ok = amqp10_client:send_msg(Sender, Msg2),
502504
ok = amqp10_client:detach_link(Sender),
503505

504-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
506+
Receiver1Name = <<"receiver 1">>,
507+
Receiver2Name = <<"receiver 2">>,
508+
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
509+
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),
505510

506-
{ok, M1} = amqp10_client:get_msg(Receiver),
511+
{ok, M1} = amqp10_client:get_msg(Receiver1),
507512
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
508513
?assertMatch(#{delivery_count := 0,
509514
first_acquirer := true},
510515
amqp10_msg:headers(M1)),
511-
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
516+
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),
512517

513-
{ok, M2a} = amqp10_client:get_msg(Receiver),
518+
{ok, M2a} = amqp10_client:get_msg(Receiver1),
514519
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
515520
?assertMatch(#{delivery_count := 0,
516521
first_acquirer := true},
517522
amqp10_msg:headers(M2a)),
518-
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
523+
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),
519524

520-
{ok, M2b} = amqp10_client:get_msg(Receiver),
525+
{ok, M2b} = amqp10_client:get_msg(Receiver1),
521526
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
522527
?assertMatch(#{delivery_count := 0,
523528
first_acquirer := false},
524529
amqp10_msg:headers(M2b)),
525-
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
530+
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),
526531

527-
{ok, M2c} = amqp10_client:get_msg(Receiver),
532+
{ok, M2c} = amqp10_client:get_msg(Receiver1),
528533
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
529534
?assertMatch(#{delivery_count := 1,
530535
first_acquirer := false},
531536
amqp10_msg:headers(M2c)),
532-
ok = amqp10_client:settle_msg(Receiver, M2c,
533-
{modified, true, false,
534-
#{<<"x-opt-key">> => <<"val 1">>}}),
535-
536-
{ok, M2d} = amqp10_client:get_msg(Receiver),
537+
ok = amqp10_client:settle_msg(
538+
Receiver1, M2c,
539+
{modified, true, false,
540+
%% Test that a history of requeue events can be tracked as described in
541+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
542+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
543+
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
544+
<<"x-opt-my-map">> => {map, [
545+
{{utf8, <<"k1">>}, {byte, -1}},
546+
{{utf8, <<"k2">>}, {ulong, 2}}
547+
]}}}),
548+
549+
{ok, M2d} = amqp10_client:get_msg(Receiver2),
537550
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
538551
?assertMatch(#{delivery_count := 2,
539552
first_acquirer := false},
540553
amqp10_msg:headers(M2d)),
541-
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
542-
ok = amqp10_client:settle_msg(Receiver, M2d,
543-
{modified, false, false,
544-
#{<<"x-opt-key">> => <<"val 2">>,
545-
<<"x-other">> => 99}}),
546-
547-
{ok, M2e} = amqp10_client:get_msg(Receiver),
554+
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
555+
<<"x-opt-requeue-reason">> := L1,
556+
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
557+
ok = amqp10_client:settle_msg(
558+
Receiver1, M2d,
559+
{modified, false, false,
560+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
561+
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
562+
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
563+
<<"x-other">> => 99}}),
564+
565+
{ok, M2e} = amqp10_client:get_msg(Receiver1),
548566
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
549567
?assertMatch(#{delivery_count := 2,
550568
first_acquirer := false},
551569
amqp10_msg:headers(M2e)),
552-
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
570+
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
571+
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
572+
<<"x-opt-my-map">> := [
573+
{{utf8, <<"k1">>}, {byte, -1}},
574+
{{utf8, <<"k2">>}, {ulong, 2}},
575+
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
576+
],
553577
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
554-
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
578+
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
555579

556-
ok = amqp10_client:detach_link(Receiver),
557-
?assertMatch({ok, #{message_count := 1}},
558-
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
580+
%% Test that we can consume via AMQP 0.9.1
581+
Ch = rabbit_ct_client_helpers:open_channel(Config),
582+
{#'basic.get_ok'{},
583+
#amqp_msg{payload = <<"m2">>,
584+
props = #'P_basic'{headers = Headers}}
585+
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
586+
%% We expect to receive only modified AMQP 1.0 message annotations that are of simple types
587+
%% (i.e. excluding list, map, array).
588+
?assertEqual({value, {<<"x-other">>, long, 99}},
589+
lists:keysearch(<<"x-other">>, 1, Headers)),
590+
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
591+
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
592+
ok = rabbit_ct_client_helpers:close_channel(Ch),
593+
594+
ok = amqp10_client:detach_link(Receiver1),
595+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
559596
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
560597
ok = end_session_sync(Session),
561598
ok = amqp10_client:close_connection(Connection).
562599

563600
%% Test that a message can be routed based on the message-annotations
564-
%% provided in the modified outcome.
601+
%% provided in the modified outcome as described in
602+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
565603
modified_dead_letter_headers_exchange(Config) ->
566604
{Connection, Session, LinkPair} = init(Config),
605+
HeadersXName = <<"my headers exchange">>,
606+
AlternateXName = <<"my alternate exchange">>,
567607
SourceQName = <<"source quorum queue">>,
568608
AppleQName = <<"dead letter classic queue receiving apples">>,
569609
BananaQName = <<"dead letter quorum queue receiving bananas">>,
610+
TrashQName = <<"trash queue receiving anything that doesn't match">>,
611+
612+
ok = rabbitmq_amqp_client:declare_exchange(
613+
LinkPair,
614+
HeadersXName,
615+
#{type => <<"headers">>,
616+
arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}),
617+
618+
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}),
619+
570620
{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
571621
LinkPair,
572622
SourceQName,
573623
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
574624
<<"x-overflow">> => {utf8, <<"reject-publish">>},
575625
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
576-
<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}),
626+
<<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}),
577627
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
578628
LinkPair,
579629
AppleQName,
@@ -582,14 +632,16 @@ modified_dead_letter_headers_exchange(Config) ->
582632
LinkPair,
583633
BananaQName,
584634
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
635+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}),
585636
ok = rabbitmq_amqp_client:bind_queue(
586-
LinkPair, AppleQName, <<"amq.headers">>, <<>>,
637+
LinkPair, AppleQName, HeadersXName, <<>>,
587638
#{<<"x-fruit">> => {utf8, <<"apple">>},
588639
<<"x-match">> => {utf8, <<"any-with-x">>}}),
589640
ok = rabbitmq_amqp_client:bind_queue(
590-
LinkPair, BananaQName, <<"amq.headers">>, <<>>,
641+
LinkPair, BananaQName, HeadersXName, <<>>,
591642
#{<<"x-fruit">> => {utf8, <<"banana">>},
592643
<<"x-match">> => {utf8, <<"any-with-x">>}}),
644+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}),
593645

594646
{ok, Sender} = amqp10_client:attach_sender_link(
595647
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)),
@@ -600,6 +652,8 @@ modified_dead_letter_headers_exchange(Config) ->
600652
Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled),
601653
{ok, ReceiverBanana} = amqp10_client:attach_receiver_link(
602654
Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled),
655+
{ok, ReceiverTrash} = amqp10_client:attach_receiver_link(
656+
Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled),
603657

604658
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
605659
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
@@ -609,7 +663,8 @@ modified_dead_letter_headers_exchange(Config) ->
609663
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
610664
#{"x-fruit" => <<"apple">>},
611665
amqp10_msg:new(<<"t4">>, <<"m4">>))),
612-
ok = wait_for_accepts(3),
666+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)),
667+
ok = wait_for_accepts(5),
613668

614669
{ok, Msg1} = amqp10_client:get_msg(Receiver),
615670
?assertMatch(#{delivery_count := 0,
@@ -650,13 +705,105 @@ modified_dead_letter_headers_exchange(Config) ->
650705
amqp10_msg:headers(MsgBanana2)),
651706
ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2),
652707

708+
{ok, Msg5} = amqp10_client:get_msg(Receiver),
709+
%% This message should be routed via the alternate exchange to the trash queue.
710+
ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}),
711+
{ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash),
712+
?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)),
713+
?assertMatch(#{delivery_count := 0,
714+
first_acquirer := false},
715+
amqp10_msg:headers(MsgTrash)),
716+
ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash),
717+
653718
ok = detach_link_sync(Sender),
654719
ok = detach_link_sync(Receiver),
655720
ok = detach_link_sync(ReceiverApple),
656721
ok = detach_link_sync(ReceiverBanana),
657722
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName),
658723
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName),
659724
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName),
725+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName),
726+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName),
727+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName),
728+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
729+
ok = end_session_sync(Session),
730+
ok = amqp10_client:close_connection(Connection).
731+
732+
%% Test that custom dead lettering event tracking works as described in
733+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
734+
modified_dead_letter_history(Config) ->
735+
{Connection, Session, LinkPair} = init(Config),
736+
Q1 = <<"qq 1">>,
737+
Q2 = <<"qq 2">>,
738+
739+
{ok, _} = rabbitmq_amqp_client:declare_queue(
740+
LinkPair, Q1,
741+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
742+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
743+
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
744+
{ok, _} = rabbitmq_amqp_client:declare_queue(
745+
LinkPair, Q2,
746+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
747+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
748+
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
749+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),
750+
751+
{ok, Sender} = amqp10_client:attach_sender_link(
752+
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
753+
wait_for_credit(Sender),
754+
{ok, Receiver1} = amqp10_client:attach_receiver_link(
755+
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
756+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
757+
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),
758+
759+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
760+
ok = wait_for_accepts(1),
761+
ok = detach_link_sync(Sender),
762+
763+
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
764+
?assertMatch(#{delivery_count := 0,
765+
first_acquirer := true},
766+
amqp10_msg:headers(Msg1)),
767+
ok = amqp10_client:settle_msg(
768+
Receiver1, Msg1,
769+
{modified, true, true,
770+
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
771+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
772+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
773+
}),
774+
775+
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
776+
?assertMatch(#{delivery_count := 1,
777+
first_acquirer := false},
778+
amqp10_msg:headers(Msg2)),
779+
#{<<"x-opt-history-list">> := L1,
780+
<<"x-opt-history-map">> := L2,
781+
<<"x-opt-history-array">> := {array, utf8, L0}
782+
} = amqp10_msg:message_annotations(Msg2),
783+
ok = amqp10_client:settle_msg(
784+
Receiver2, Msg2,
785+
{modified, true, true,
786+
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
787+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
788+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
789+
<<"x-other">> => -99}}),
790+
791+
{ok, Msg3} = amqp10_client:get_msg(Receiver1),
792+
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
793+
?assertMatch(#{delivery_count := 2,
794+
first_acquirer := false},
795+
amqp10_msg:headers(Msg3)),
796+
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
797+
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
798+
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
799+
{{symbol, <<"k1">>}, {byte, -1}}],
800+
<<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)),
801+
ok = amqp10_client:accept_msg(Receiver1, Msg3),
802+
803+
ok = detach_link_sync(Receiver1),
804+
ok = detach_link_sync(Receiver2),
805+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
806+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
660807
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
661808
ok = end_session_sync(Session),
662809
ok = amqp10_client:close_connection(Connection).

0 commit comments

Comments
 (0)