@@ -129,6 +129,7 @@ groups() ->
129
129
modified_classic_queue ,
130
130
modified_quorum_queue ,
131
131
modified_dead_letter_headers_exchange ,
132
+ modified_dead_letter_history ,
132
133
dead_letter_headers_exchange ,
133
134
dead_letter_reject ,
134
135
dead_letter_reject_message_order_classic_queue ,
@@ -264,7 +265,8 @@ init_per_testcase(T, Config)
264
265
end ;
265
266
init_per_testcase (T , Config )
266
267
when T =:= modified_quorum_queue orelse
267
- T =:= modified_dead_letter_headers_exchange ->
268
+ T =:= modified_dead_letter_headers_exchange orelse
269
+ T =:= modified_dead_letter_history ->
268
270
case rpc (Config , rabbit_feature_flags , is_enabled , ['rabbitmq_4.0.0' ]) of
269
271
true ->
270
272
rabbit_ct_helpers :testcase_started (Config , T );
@@ -727,6 +729,85 @@ modified_dead_letter_headers_exchange(Config) ->
727
729
ok = end_session_sync (Session ),
728
730
ok = amqp10_client :close_connection (Connection ).
729
731
732
+ % % Test that custom dead lettering event tracking works as described in
733
+ % % https://rabbitmq.com/blog/2024/10/11/modified-outcome
734
+ modified_dead_letter_history (Config ) ->
735
+ {Connection , Session , LinkPair } = init (Config ),
736
+ Q1 = <<" qq 1" >>,
737
+ Q2 = <<" qq 2" >>,
738
+
739
+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
740
+ LinkPair , Q1 ,
741
+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
742
+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
743
+ <<" x-dead-letter-exchange" >> => {utf8 , <<" amq.fanout" >>}}}),
744
+ {ok , _ } = rabbitmq_amqp_client :declare_queue (
745
+ LinkPair , Q2 ,
746
+ #{arguments => #{<<" x-queue-type" >> => {utf8 , <<" quorum" >>},
747
+ <<" x-dead-letter-strategy" >> => {utf8 , <<" at-most-once" >>},
748
+ <<" x-dead-letter-exchange" >> => {utf8 , <<>>}}}),
749
+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , Q2 , <<" amq.fanout" >>, <<>>, #{}),
750
+
751
+ {ok , Sender } = amqp10_client :attach_sender_link (
752
+ Session , <<" test-sender" >>, rabbitmq_amqp_address :queue (Q1 )),
753
+ wait_for_credit (Sender ),
754
+ {ok , Receiver1 } = amqp10_client :attach_receiver_link (
755
+ Session , <<" receiver 1" >>, rabbitmq_amqp_address :queue (Q1 ), unsettled ),
756
+ {ok , Receiver2 } = amqp10_client :attach_receiver_link (
757
+ Session , <<" receiver 2" >>, rabbitmq_amqp_address :queue (Q2 ), unsettled ),
758
+
759
+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" t" >>, <<" m" >>)),
760
+ ok = wait_for_accepts (1 ),
761
+ ok = detach_link_sync (Sender ),
762
+
763
+ {ok , Msg1 } = amqp10_client :get_msg (Receiver1 ),
764
+ ? assertMatch (#{delivery_count := 0 ,
765
+ first_acquirer := true },
766
+ amqp10_msg :headers (Msg1 )),
767
+ ok = amqp10_client :settle_msg (
768
+ Receiver1 , Msg1 ,
769
+ {modified , true , true ,
770
+ #{<<" x-opt-history-list" >> => {list , [{utf8 , <<" l1" >>}]},
771
+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k1" >>}, {byte , - 1 }}]},
772
+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a1" >>}]}}
773
+ }),
774
+
775
+ {ok , Msg2 } = amqp10_client :get_msg (Receiver2 ),
776
+ ? assertMatch (#{delivery_count := 1 ,
777
+ first_acquirer := false },
778
+ amqp10_msg :headers (Msg2 )),
779
+ #{<<" x-opt-history-list" >> := L1 ,
780
+ <<" x-opt-history-map" >> := L2 ,
781
+ <<" x-opt-history-array" >> := {array , utf8 , L0 }
782
+ } = amqp10_msg :message_annotations (Msg2 ),
783
+ ok = amqp10_client :settle_msg (
784
+ Receiver2 , Msg2 ,
785
+ {modified , true , true ,
786
+ #{<<" x-opt-history-list" >> => {list , [{int , - 99 } | L1 ]},
787
+ <<" x-opt-history-map" >> => {map , [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}} | L2 ]},
788
+ <<" x-opt-history-array" >> => {array , utf8 , [{utf8 , <<" a2" >>} | L0 ]},
789
+ <<" x-other" >> => 99 }}),
790
+
791
+ {ok , Msg3 } = amqp10_client :get_msg (Receiver1 ),
792
+ ? assertEqual ([<<" m" >>], amqp10_msg :body (Msg3 )),
793
+ ? assertMatch (#{delivery_count := 2 ,
794
+ first_acquirer := false },
795
+ amqp10_msg :headers (Msg3 )),
796
+ ? assertMatch (#{<<" x-opt-history-array" >> := {array , utf8 , [{utf8 , <<" a2" >>}, {utf8 , <<" a1" >>}]},
797
+ <<" x-opt-history-list" >> := [{int , - 99 }, {utf8 , <<" l1" >>}],
798
+ <<" x-opt-history-map" >> := [{{symbol , <<" k2" >>}, {symbol , <<" v2" >>}},
799
+ {{symbol , <<" k1" >>}, {byte , - 1 }}],
800
+ <<" x-other" >> := 99 }, amqp10_msg :message_annotations (Msg3 )),
801
+ ok = amqp10_client :accept_msg (Receiver1 , Msg3 ),
802
+
803
+ ok = detach_link_sync (Receiver1 ),
804
+ ok = detach_link_sync (Receiver2 ),
805
+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q1 ),
806
+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , Q2 ),
807
+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
808
+ ok = end_session_sync (Session ),
809
+ ok = amqp10_client :close_connection (Connection ).
810
+
730
811
% % Tests that confirmations are returned correctly
731
812
% % when sending many messages async to a quorum queue.
732
813
sender_settle_mode_unsettled (Config ) ->
0 commit comments