Skip to content

Commit 2292638

Browse files
Merge pull request #12507 from rabbitmq/mergify/bp/v4.0.x/pr-12506
Track requeue history (backport #12506)
2 parents ab97ab5 + 67319f0 commit 2292638

File tree

6 files changed

+202
-44
lines changed

6 files changed

+202
-44
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,13 +1181,20 @@ wrap_map_value(true) ->
11811181
wrap_map_value(false) ->
11821182
{boolean, false};
11831183
wrap_map_value(V) when is_integer(V) ->
1184-
{uint, V};
1184+
case V < 0 of
1185+
true ->
1186+
{int, V};
1187+
false ->
1188+
uint(V)
1189+
end;
11851190
wrap_map_value(V) when is_binary(V) ->
11861191
utf8(V);
11871192
wrap_map_value(V) when is_list(V) ->
11881193
utf8(list_to_binary(V));
11891194
wrap_map_value(V) when is_atom(V) ->
1190-
utf8(atom_to_list(V)).
1195+
utf8(atom_to_list(V));
1196+
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
1197+
TaggedValue.
11911198

11921199
utf8(V) -> amqp10_client_types:utf8(V).
11931200

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
-type str() :: atom() | string() | binary().
4545
-type internal_ann_key() :: atom().
4646
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
47-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
47+
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
4848
-type protocol() :: module().
4949
-type annotations() :: #{internal_ann_key() => term(),
5050
x_ann_key() => x_ann_value()}.

deps/rabbit/src/mc_util.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
5252
{long, V};
5353
infer_type(V) when is_boolean(V) ->
5454
{boolean, V};
55-
infer_type({T, _} = V) when is_atom(T) ->
56-
%% looks like a pre-tagged type
57-
V.
55+
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
56+
TaggedValue.
5857

5958
utf8_string_is_ascii(UTF8String) ->
6059
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1921,7 +1921,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
19211921
Anns1 = lists:map(
19221922
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
19231923
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
1924-
{K, unwrap(V)}
1924+
{K, unwrap_simple_type(V)}
19251925
end, KVList),
19261926
maps:from_list(Anns1)
19271927
end,
@@ -3603,7 +3603,14 @@ format_status(
36033603
topic_permission_cache => TopicPermissionCache},
36043604
maps:update(state, State, Status).
36053605

3606-
unwrap({_Tag, V}) ->
3606+
3607+
unwrap_simple_type(V = {list, _}) ->
3608+
V;
3609+
unwrap_simple_type(V = {map, _}) ->
3610+
V;
3611+
unwrap_simple_type(V = {array, _, _}) ->
3612+
V;
3613+
unwrap_simple_type({_SimpleType, V}) ->
36073614
V;
3608-
unwrap(V) ->
3615+
unwrap_simple_type(V) ->
36093616
V.

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 178 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ groups() ->
118118
modified_classic_queue,
119119
modified_quorum_queue,
120120
modified_dead_letter_headers_exchange,
121+
modified_dead_letter_history,
121122
dead_letter_headers_exchange,
122123
dead_letter_reject,
123124
dead_letter_reject_message_order_classic_queue,
@@ -253,7 +254,8 @@ init_per_testcase(T, Config)
253254
end;
254255
init_per_testcase(T, Config)
255256
when T =:= modified_quorum_queue orelse
256-
T =:= modified_dead_letter_headers_exchange ->
257+
T =:= modified_dead_letter_headers_exchange orelse
258+
T =:= modified_dead_letter_history ->
257259
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
258260
true ->
259261
rabbit_ct_helpers:testcase_started(Config, T);
@@ -490,79 +492,127 @@ modified_quorum_queue(Config) ->
490492
ok = amqp10_client:send_msg(Sender, Msg2),
491493
ok = amqp10_client:detach_link(Sender),
492494

493-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
495+
Receiver1Name = <<"receiver 1">>,
496+
Receiver2Name = <<"receiver 2">>,
497+
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
498+
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),
494499

495-
{ok, M1} = amqp10_client:get_msg(Receiver),
500+
{ok, M1} = amqp10_client:get_msg(Receiver1),
496501
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
497502
?assertMatch(#{delivery_count := 0,
498503
first_acquirer := true},
499504
amqp10_msg:headers(M1)),
500-
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
505+
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),
501506

502-
{ok, M2a} = amqp10_client:get_msg(Receiver),
507+
{ok, M2a} = amqp10_client:get_msg(Receiver1),
503508
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
504509
?assertMatch(#{delivery_count := 0,
505510
first_acquirer := true},
506511
amqp10_msg:headers(M2a)),
507-
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
512+
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),
508513

509-
{ok, M2b} = amqp10_client:get_msg(Receiver),
514+
{ok, M2b} = amqp10_client:get_msg(Receiver1),
510515
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
511516
?assertMatch(#{delivery_count := 0,
512517
first_acquirer := false},
513518
amqp10_msg:headers(M2b)),
514-
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
519+
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),
515520

516-
{ok, M2c} = amqp10_client:get_msg(Receiver),
521+
{ok, M2c} = amqp10_client:get_msg(Receiver1),
517522
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
518523
?assertMatch(#{delivery_count := 1,
519524
first_acquirer := false},
520525
amqp10_msg:headers(M2c)),
521-
ok = amqp10_client:settle_msg(Receiver, M2c,
522-
{modified, true, false,
523-
#{<<"x-opt-key">> => <<"val 1">>}}),
524-
525-
{ok, M2d} = amqp10_client:get_msg(Receiver),
526+
ok = amqp10_client:settle_msg(
527+
Receiver1, M2c,
528+
{modified, true, false,
529+
%% Test that a history of requeue events can be tracked as described in
530+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
531+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
532+
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
533+
<<"x-opt-my-map">> => {map, [
534+
{{utf8, <<"k1">>}, {byte, -1}},
535+
{{utf8, <<"k2">>}, {ulong, 2}}
536+
]}}}),
537+
538+
{ok, M2d} = amqp10_client:get_msg(Receiver2),
526539
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
527540
?assertMatch(#{delivery_count := 2,
528541
first_acquirer := false},
529542
amqp10_msg:headers(M2d)),
530-
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
531-
ok = amqp10_client:settle_msg(Receiver, M2d,
532-
{modified, false, false,
533-
#{<<"x-opt-key">> => <<"val 2">>,
534-
<<"x-other">> => 99}}),
535-
536-
{ok, M2e} = amqp10_client:get_msg(Receiver),
543+
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
544+
<<"x-opt-requeue-reason">> := L1,
545+
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
546+
ok = amqp10_client:settle_msg(
547+
Receiver1, M2d,
548+
{modified, false, false,
549+
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
550+
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
551+
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
552+
<<"x-other">> => 99}}),
553+
554+
{ok, M2e} = amqp10_client:get_msg(Receiver1),
537555
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
538556
?assertMatch(#{delivery_count := 2,
539557
first_acquirer := false},
540558
amqp10_msg:headers(M2e)),
541-
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
559+
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
560+
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
561+
<<"x-opt-my-map">> := [
562+
{{utf8, <<"k1">>}, {byte, -1}},
563+
{{utf8, <<"k2">>}, {ulong, 2}},
564+
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
565+
],
542566
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
543-
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
567+
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
544568

545-
ok = amqp10_client:detach_link(Receiver),
546-
?assertMatch({ok, #{message_count := 1}},
547-
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
569+
%% Test that we can consume via AMQP 0.9.1
570+
Ch = rabbit_ct_client_helpers:open_channel(Config),
571+
{#'basic.get_ok'{},
572+
#amqp_msg{payload = <<"m2">>,
573+
props = #'P_basic'{headers = Headers}}
574+
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
575+
%% We expect to receive only modified AMQP 1.0 message annotations that are of simple types
576+
%% (i.e. excluding list, map, array).
577+
?assertEqual({value, {<<"x-other">>, long, 99}},
578+
lists:keysearch(<<"x-other">>, 1, Headers)),
579+
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
580+
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
581+
ok = rabbit_ct_client_helpers:close_channel(Ch),
582+
583+
ok = amqp10_client:detach_link(Receiver1),
584+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
548585
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
549586
ok = end_session_sync(Session),
550587
ok = amqp10_client:close_connection(Connection).
551588

552589
%% Test that a message can be routed based on the message-annotations
553-
%% provided in the modified outcome.
590+
%% provided in the modified outcome as described in
591+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
554592
modified_dead_letter_headers_exchange(Config) ->
555593
{Connection, Session, LinkPair} = init(Config),
594+
HeadersXName = <<"my headers exchange">>,
595+
AlternateXName = <<"my alternate exchange">>,
556596
SourceQName = <<"source quorum queue">>,
557597
AppleQName = <<"dead letter classic queue receiving apples">>,
558598
BananaQName = <<"dead letter quorum queue receiving bananas">>,
599+
TrashQName = <<"trash queue receiving anything that doesn't match">>,
600+
601+
ok = rabbitmq_amqp_client:declare_exchange(
602+
LinkPair,
603+
HeadersXName,
604+
#{type => <<"headers">>,
605+
arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}),
606+
607+
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}),
608+
559609
{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
560610
LinkPair,
561611
SourceQName,
562612
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
563613
<<"x-overflow">> => {utf8, <<"reject-publish">>},
564614
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
565-
<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}),
615+
<<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}),
566616
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
567617
LinkPair,
568618
AppleQName,
@@ -571,14 +621,16 @@ modified_dead_letter_headers_exchange(Config) ->
571621
LinkPair,
572622
BananaQName,
573623
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
624+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}),
574625
ok = rabbitmq_amqp_client:bind_queue(
575-
LinkPair, AppleQName, <<"amq.headers">>, <<>>,
626+
LinkPair, AppleQName, HeadersXName, <<>>,
576627
#{<<"x-fruit">> => {utf8, <<"apple">>},
577628
<<"x-match">> => {utf8, <<"any-with-x">>}}),
578629
ok = rabbitmq_amqp_client:bind_queue(
579-
LinkPair, BananaQName, <<"amq.headers">>, <<>>,
630+
LinkPair, BananaQName, HeadersXName, <<>>,
580631
#{<<"x-fruit">> => {utf8, <<"banana">>},
581632
<<"x-match">> => {utf8, <<"any-with-x">>}}),
633+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}),
582634

583635
{ok, Sender} = amqp10_client:attach_sender_link(
584636
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)),
@@ -589,6 +641,8 @@ modified_dead_letter_headers_exchange(Config) ->
589641
Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled),
590642
{ok, ReceiverBanana} = amqp10_client:attach_receiver_link(
591643
Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled),
644+
{ok, ReceiverTrash} = amqp10_client:attach_receiver_link(
645+
Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled),
592646

593647
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
594648
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
@@ -598,7 +652,8 @@ modified_dead_letter_headers_exchange(Config) ->
598652
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
599653
#{"x-fruit" => <<"apple">>},
600654
amqp10_msg:new(<<"t4">>, <<"m4">>))),
601-
ok = wait_for_accepts(3),
655+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)),
656+
ok = wait_for_accepts(5),
602657

603658
{ok, Msg1} = amqp10_client:get_msg(Receiver),
604659
?assertMatch(#{delivery_count := 0,
@@ -639,13 +694,105 @@ modified_dead_letter_headers_exchange(Config) ->
639694
amqp10_msg:headers(MsgBanana2)),
640695
ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2),
641696

697+
{ok, Msg5} = amqp10_client:get_msg(Receiver),
698+
%% This message should be routed via the alternate exchange to the trash queue.
699+
ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}),
700+
{ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash),
701+
?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)),
702+
?assertMatch(#{delivery_count := 0,
703+
first_acquirer := false},
704+
amqp10_msg:headers(MsgTrash)),
705+
ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash),
706+
642707
ok = detach_link_sync(Sender),
643708
ok = detach_link_sync(Receiver),
644709
ok = detach_link_sync(ReceiverApple),
645710
ok = detach_link_sync(ReceiverBanana),
646711
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName),
647712
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName),
648713
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName),
714+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName),
715+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName),
716+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName),
717+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
718+
ok = end_session_sync(Session),
719+
ok = amqp10_client:close_connection(Connection).
720+
721+
%% Test that custom dead lettering event tracking works as described in
722+
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
723+
modified_dead_letter_history(Config) ->
724+
{Connection, Session, LinkPair} = init(Config),
725+
Q1 = <<"qq 1">>,
726+
Q2 = <<"qq 2">>,
727+
728+
{ok, _} = rabbitmq_amqp_client:declare_queue(
729+
LinkPair, Q1,
730+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
731+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
732+
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
733+
{ok, _} = rabbitmq_amqp_client:declare_queue(
734+
LinkPair, Q2,
735+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
736+
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
737+
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
738+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),
739+
740+
{ok, Sender} = amqp10_client:attach_sender_link(
741+
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
742+
wait_for_credit(Sender),
743+
{ok, Receiver1} = amqp10_client:attach_receiver_link(
744+
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
745+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
746+
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),
747+
748+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
749+
ok = wait_for_accepts(1),
750+
ok = detach_link_sync(Sender),
751+
752+
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
753+
?assertMatch(#{delivery_count := 0,
754+
first_acquirer := true},
755+
amqp10_msg:headers(Msg1)),
756+
ok = amqp10_client:settle_msg(
757+
Receiver1, Msg1,
758+
{modified, true, true,
759+
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
760+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
761+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
762+
}),
763+
764+
{ok, Msg2} = amqp10_client:get_msg(Receiver2),
765+
?assertMatch(#{delivery_count := 1,
766+
first_acquirer := false},
767+
amqp10_msg:headers(Msg2)),
768+
#{<<"x-opt-history-list">> := L1,
769+
<<"x-opt-history-map">> := L2,
770+
<<"x-opt-history-array">> := {array, utf8, L0}
771+
} = amqp10_msg:message_annotations(Msg2),
772+
ok = amqp10_client:settle_msg(
773+
Receiver2, Msg2,
774+
{modified, true, true,
775+
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
776+
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
777+
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
778+
<<"x-other">> => -99}}),
779+
780+
{ok, Msg3} = amqp10_client:get_msg(Receiver1),
781+
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
782+
?assertMatch(#{delivery_count := 2,
783+
first_acquirer := false},
784+
amqp10_msg:headers(Msg3)),
785+
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
786+
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
787+
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
788+
{{symbol, <<"k1">>}, {byte, -1}}],
789+
<<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)),
790+
ok = amqp10_client:accept_msg(Receiver1, Msg3),
791+
792+
ok = detach_link_sync(Receiver1),
793+
ok = detach_link_sync(Receiver2),
794+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
795+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
649796
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
650797
ok = end_session_sync(Session),
651798
ok = amqp10_client:close_connection(Connection).

0 commit comments

Comments
 (0)