@@ -596,19 +596,32 @@ modified_quorum_queue(Config) ->
596
596
ok = amqp10_client :close_connection (Connection ).
597
597
598
598
% % Test that a message can be routed based on the message-annotations
599
- % % provided in the modified outcome.
599
+ % % provided in the modified outcome as described in
600
+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
600
601
modified_dead_letter_headers_exchange (Config ) ->
601
602
{Connection , Session , LinkPair } = init (Config ),
603
+ HeadersXName = <<" my headers exchange" >>,
604
+ AlternateXName = <<" my alternate exchange" >>,
602
605
SourceQName = <<" source quorum queue" >>,
603
606
AppleQName = <<" dead letter classic queue receiving apples" >>,
604
607
BananaQName = <<" dead letter quorum queue receiving bananas" >>,
608
+ TrashQName = <<" trash queue receiving anything that doesn't match" >>,
609
+
610
+ ok = rabbitmq_amqp_client :declare_exchange (
611
+ LinkPair ,
612
+ HeadersXName ,
613
+ #{type => <<" headers" >>,
614
+ arguments => #{<<" alternate-exchange" >> => {utf8 , AlternateXName }}}),
615
+
616
+ ok = rabbitmq_amqp_client :declare_exchange (LinkPair , AlternateXName , #{type => <<" fanout" >>}),
617
+
605
618
{ok , #{type := <<" quorum" >>}} = rabbitmq_amqp_client :declare_queue (
606
619
LinkPair ,
607
620
SourceQName ,
608
621
#{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
609
622
<<" x-overflow" >> => {utf8 , <<" reject-publish" >>},
610
623
<<" x-dead-letter-strategy" >> => {utf8 , <<" at-least-once" >>},
611
- <<" x-dead-letter-exchange" >> => {utf8 , << " amq.headers " >> }}}),
624
+ <<" x-dead-letter-exchange" >> => {utf8 , HeadersXName }}}),
612
625
{ok , #{type := <<" classic" >>}} = rabbitmq_amqp_client :declare_queue (
613
626
LinkPair ,
614
627
AppleQName ,
@@ -617,14 +630,16 @@ modified_dead_letter_headers_exchange(Config) ->
617
630
LinkPair ,
618
631
BananaQName ,
619
632
#{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>}}}),
633
+ {ok , _ } = rabbitmq_amqp_client :declare_queue (LinkPair , TrashQName , #{}),
620
634
ok = rabbitmq_amqp_client :bind_queue (
621
- LinkPair , AppleQName , << " amq.headers " >> , <<>>,
635
+ LinkPair , AppleQName , HeadersXName , <<>>,
622
636
#{<<" x-fruit" >> => {utf8 , <<" apple" >>},
623
637
<<" x-match" >> => {utf8 , <<" any-with-x" >>}}),
624
638
ok = rabbitmq_amqp_client :bind_queue (
625
- LinkPair , BananaQName , << " amq.headers " >> , <<>>,
639
+ LinkPair , BananaQName , HeadersXName , <<>>,
626
640
#{<<" x-fruit" >> => {utf8 , <<" banana" >>},
627
641
<<" x-match" >> => {utf8 , <<" any-with-x" >>}}),
642
+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , TrashQName , AlternateXName , <<>>, #{}),
628
643
629
644
{ok , Sender } = amqp10_client :attach_sender_link (
630
645
Session , <<" test-sender" >>, rabbitmq_amqp_address :queue (SourceQName )),
@@ -635,6 +650,8 @@ modified_dead_letter_headers_exchange(Config) ->
635
650
Session , <<" receiver apple" >>, rabbitmq_amqp_address :queue (AppleQName ), unsettled ),
636
651
{ok , ReceiverBanana } = amqp10_client :attach_receiver_link (
637
652
Session , <<" receiver banana" >>, rabbitmq_amqp_address :queue (BananaQName ), unsettled ),
653
+ {ok , ReceiverTrash } = amqp10_client :attach_receiver_link (
654
+ Session , <<" receiver trash" >>, rabbitmq_amqp_address :queue (TrashQName ), unsettled ),
638
655
639
656
ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t1" >>, <<" m1" >>)),
640
657
ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t2" >>, <<" m2" >>)),
@@ -644,7 +661,8 @@ modified_dead_letter_headers_exchange(Config) ->
644
661
ok = amqp10_client :send_msg (Sender , amqp10_msg :set_message_annotations (
645
662
#{" x-fruit" => <<" apple" >>},
646
663
amqp10_msg :new (<<" t4" >>, <<" m4" >>))),
647
- ok = wait_for_accepts (3 ),
664
+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t5" >>, <<" m5" >>)),
665
+ ok = wait_for_accepts (5 ),
648
666
649
667
{ok , Msg1 } = amqp10_client :get_msg (Receiver ),
650
668
? assertMatch (#{delivery_count := 0 ,
@@ -685,13 +703,26 @@ modified_dead_letter_headers_exchange(Config) ->
685
703
amqp10_msg :headers (MsgBanana2 )),
686
704
ok = amqp10_client :accept_msg (ReceiverBanana , MsgBanana2 ),
687
705
706
+ {ok , Msg5 } = amqp10_client :get_msg (Receiver ),
707
+ % % This message should be routed via the alternate exchange to the trash queue.
708
+ ok = amqp10_client :settle_msg (Receiver , Msg5 , {modified , false , true , #{<<" x-fruit" >> => <<" strawberry" >>}}),
709
+ {ok , MsgTrash } = amqp10_client :get_msg (ReceiverTrash ),
710
+ ? assertEqual ([<<" m5" >>], amqp10_msg :body (MsgTrash )),
711
+ ? assertMatch (#{delivery_count := 0 ,
712
+ first_acquirer := false },
713
+ amqp10_msg :headers (MsgTrash )),
714
+ ok = amqp10_client :accept_msg (ReceiverTrash , MsgTrash ),
715
+
688
716
ok = detach_link_sync (Sender ),
689
717
ok = detach_link_sync (Receiver ),
690
718
ok = detach_link_sync (ReceiverApple ),
691
719
ok = detach_link_sync (ReceiverBanana ),
692
720
{ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , SourceQName ),
693
721
{ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , AppleQName ),
694
722
{ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , BananaQName ),
723
+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , TrashQName ),
724
+ ok = rabbitmq_amqp_client :delete_exchange (LinkPair , HeadersXName ),
725
+ ok = rabbitmq_amqp_client :delete_exchange (LinkPair , AlternateXName ),
695
726
ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
696
727
ok = end_session_sync (Session ),
697
728
ok = amqp10_client :close_connection (Connection ).
0 commit comments