@@ -108,7 +108,9 @@ all_tests() ->
108
108
cancel_sync_queue ,
109
109
basic_recover ,
110
110
idempotent_recover ,
111
- vhost_with_quorum_queue_is_deleted
111
+ vhost_with_quorum_queue_is_deleted ,
112
+ consume_redelivery_count ,
113
+ subscribe_redelivery_count
112
114
].
113
115
114
116
% % -------------------------------------------------------------------
@@ -1633,6 +1635,99 @@ basic_recover(Config) ->
1633
1635
amqp_channel :cast (Ch , # 'basic.recover' {requeue = true }),
1634
1636
wait_for_messages_ready (Servers , RaName , 1 ),
1635
1637
wait_for_messages_pending_ack (Servers , RaName , 0 ).
1638
+
1639
+ subscribe_redelivery_count (Config ) ->
1640
+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1641
+
1642
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
1643
+ QQ = ? config (queue_name , Config ),
1644
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1645
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1646
+
1647
+ RaName = ra_name (QQ ),
1648
+ publish (Ch , QQ ),
1649
+ wait_for_messages_ready (Servers , RaName , 1 ),
1650
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1651
+ subscribe (Ch , QQ , false ),
1652
+
1653
+ DTag = <<" x-redelivery-count" >>,
1654
+ receive
1655
+ {# 'basic.deliver' {delivery_tag = DeliveryTag ,
1656
+ redelivered = false },
1657
+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} ->
1658
+ ? assertMatch ({DTag , _ , 0 }, rabbit_basic :header (DTag , H0 )),
1659
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
1660
+ multiple = false ,
1661
+ requeue = true })
1662
+ end ,
1663
+
1664
+ receive
1665
+ {# 'basic.deliver' {delivery_tag = DeliveryTag1 ,
1666
+ redelivered = true },
1667
+ # amqp_msg {props = # 'P_basic' {headers = H1 }}} ->
1668
+ ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1669
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
1670
+ multiple = false ,
1671
+ requeue = true })
1672
+ end ,
1673
+
1674
+ receive
1675
+ {# 'basic.deliver' {delivery_tag = DeliveryTag2 ,
1676
+ redelivered = true },
1677
+ # amqp_msg {props = # 'P_basic' {headers = H2 }}} ->
1678
+ ? assertMatch ({DTag , _ , 2 }, rabbit_basic :header (DTag , H2 )),
1679
+ amqp_channel :cast (Ch , # 'basic.ack' {delivery_tag = DeliveryTag2 ,
1680
+ multiple = false }),
1681
+ wait_for_messages_ready (Servers , RaName , 0 ),
1682
+ wait_for_messages_pending_ack (Servers , RaName , 0 )
1683
+ end .
1684
+
1685
+ consume_redelivery_count (Config ) ->
1686
+ [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1687
+
1688
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
1689
+ QQ = ? config (queue_name , Config ),
1690
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1691
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1692
+
1693
+ RaName = ra_name (QQ ),
1694
+ publish (Ch , QQ ),
1695
+ wait_for_messages_ready (Servers , RaName , 1 ),
1696
+ wait_for_messages_pending_ack (Servers , RaName , 0 ),
1697
+
1698
+ DTag = <<" x-redelivery-count" >>,
1699
+
1700
+ {# 'basic.get_ok' {delivery_tag = DeliveryTag ,
1701
+ redelivered = false },
1702
+ # amqp_msg {props = # 'P_basic' {headers = H0 }}} =
1703
+ amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
1704
+ no_ack = false }),
1705
+ ? assertMatch ({DTag , _ , 0 }, rabbit_basic :header (DTag , H0 )),
1706
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag ,
1707
+ multiple = false ,
1708
+ requeue = true }),
1709
+
1710
+ {# 'basic.get_ok' {delivery_tag = DeliveryTag1 ,
1711
+ redelivered = true },
1712
+ # amqp_msg {props = # 'P_basic' {headers = H1 }}} =
1713
+ amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
1714
+ no_ack = false }),
1715
+ ? assertMatch ({DTag , _ , 1 }, rabbit_basic :header (DTag , H1 )),
1716
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag1 ,
1717
+ multiple = false ,
1718
+ requeue = true }),
1719
+
1720
+ {# 'basic.get_ok' {delivery_tag = DeliveryTag2 ,
1721
+ redelivered = true },
1722
+ # amqp_msg {props = # 'P_basic' {headers = H2 }}} =
1723
+ amqp_channel :call (Ch , # 'basic.get' {queue = QQ ,
1724
+ no_ack = false }),
1725
+ ? assertMatch ({DTag , _ , 2 }, rabbit_basic :header (DTag , H2 )),
1726
+ amqp_channel :cast (Ch , # 'basic.nack' {delivery_tag = DeliveryTag2 ,
1727
+ multiple = false ,
1728
+ requeue = true }).
1729
+
1730
+
1636
1731
% %----------------------------------------------------------------------------
1637
1732
1638
1733
declare (Ch , Q ) ->
0 commit comments