@@ -81,6 +81,8 @@ groups() ->
81
81
stop_classic_queue ,
82
82
stop_quorum_queue ,
83
83
stop_stream ,
84
+ consumer_priority_classic_queue ,
85
+ consumer_priority_quorum_queue ,
84
86
single_active_consumer_classic_queue ,
85
87
single_active_consumer_quorum_queue ,
86
88
detach_requeues_one_session_classic_queue ,
@@ -1841,6 +1843,95 @@ stop(QType, Config) ->
1841
1843
# 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = QName }),
1842
1844
ok = rabbit_ct_client_helpers :close_channel (Ch ).
1843
1845
1846
+ consumer_priority_classic_queue (Config ) ->
1847
+ consumer_priority (<<" classic" >>, Config ).
1848
+
1849
+ consumer_priority_quorum_queue (Config ) ->
1850
+ consumer_priority (<<" quorum" >>, Config ).
1851
+
1852
+ consumer_priority (QType , Config ) ->
1853
+ QName = atom_to_binary (? FUNCTION_NAME ),
1854
+ {Connection , Session , LinkPair } = init (Config ),
1855
+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QType }}},
1856
+ {ok , #{type := QType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
1857
+
1858
+ Address = rabbitmq_amqp_address :queue (QName ),
1859
+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" sender" >>, Address ),
1860
+ ok = wait_for_credit (Sender ),
1861
+
1862
+ % % We test what our RabbitMQ docs state:
1863
+ % % "Consumers which do not specify a value have priority 0.
1864
+ % % Larger numbers indicate higher priority, and both positive and negative numbers can be used."
1865
+ {ok , ReceiverDefaultPrio } = amqp10_client :attach_receiver_link (
1866
+ Session ,
1867
+ <<" default prio consumer" >>,
1868
+ Address ,
1869
+ unsettled ),
1870
+ {ok , ReceiverHighPrio } = amqp10_client :attach_receiver_link (
1871
+ Session ,
1872
+ <<" high prio consumer" >>,
1873
+ Address ,
1874
+ unsettled ,
1875
+ none ,
1876
+ #{},
1877
+ #{<<" rabbitmq:priority" >> => {int , 2_000_000_000 }}),
1878
+ {ok , ReceiverLowPrio } = amqp10_client :attach_receiver_link (
1879
+ Session ,
1880
+ <<" low prio consumer" >>,
1881
+ Address ,
1882
+ unsettled ,
1883
+ none ,
1884
+ #{},
1885
+ #{<<" rabbitmq:priority" >> => {int , - 2_000_000_000 }}),
1886
+ ok = amqp10_client :flow_link_credit (ReceiverDefaultPrio , 1 , never ),
1887
+ ok = amqp10_client :flow_link_credit (ReceiverHighPrio , 2 , never ),
1888
+ ok = amqp10_client :flow_link_credit (ReceiverLowPrio , 1 , never ),
1889
+
1890
+ NumMsgs = 5 ,
1891
+ [begin
1892
+ Bin = integer_to_binary (N ),
1893
+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (Bin , Bin ))
1894
+ end || N <- lists :seq (1 , NumMsgs )],
1895
+ ok = wait_for_accepts (NumMsgs ),
1896
+
1897
+ receive {amqp10_msg , Rec1 , Msg1 } ->
1898
+ ? assertEqual (<<" 1" >>, amqp10_msg :body_bin (Msg1 )),
1899
+ ? assertEqual (ReceiverHighPrio , Rec1 ),
1900
+ ok = amqp10_client :accept_msg (Rec1 , Msg1 )
1901
+ after 5000 -> ct :fail ({missing_msg , ? LINE })
1902
+ end ,
1903
+ receive {amqp10_msg , Rec2 , Msg2 } ->
1904
+ ? assertEqual (<<" 2" >>, amqp10_msg :body_bin (Msg2 )),
1905
+ ? assertEqual (ReceiverHighPrio , Rec2 ),
1906
+ ok = amqp10_client :accept_msg (Rec2 , Msg2 )
1907
+ after 5000 -> ct :fail ({missing_msg , ? LINE })
1908
+ end ,
1909
+ receive {amqp10_msg , Rec3 , Msg3 } ->
1910
+ ? assertEqual (<<" 3" >>, amqp10_msg :body_bin (Msg3 )),
1911
+ ? assertEqual (ReceiverDefaultPrio , Rec3 ),
1912
+ ok = amqp10_client :accept_msg (Rec3 , Msg3 )
1913
+ after 5000 -> ct :fail ({missing_msg , ? LINE })
1914
+ end ,
1915
+ receive {amqp10_msg , Rec4 , Msg4 } ->
1916
+ ? assertEqual (<<" 4" >>, amqp10_msg :body_bin (Msg4 )),
1917
+ ? assertEqual (ReceiverLowPrio , Rec4 ),
1918
+ ok = amqp10_client :accept_msg (Rec4 , Msg4 )
1919
+ after 5000 -> ct :fail ({missing_msg , ? LINE })
1920
+ end ,
1921
+ receive {amqp10_msg , _ , _ } = Unexpected ->
1922
+ ct :fail ({unexpected_msg , Unexpected , ? LINE })
1923
+ after 5 -> ok
1924
+ end ,
1925
+
1926
+ ok = amqp10_client :detach_link (Sender ),
1927
+ ok = amqp10_client :detach_link (ReceiverDefaultPrio ),
1928
+ ok = amqp10_client :detach_link (ReceiverHighPrio ),
1929
+ ok = amqp10_client :detach_link (ReceiverLowPrio ),
1930
+ {ok , #{message_count := 1 }} = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
1931
+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
1932
+ ok = end_session_sync (Session ),
1933
+ ok = amqp10_client :close_connection (Connection ).
1934
+
1844
1935
single_active_consumer_classic_queue (Config ) ->
1845
1936
single_active_consumer (<<" classic" >>, Config ).
1846
1937
@@ -4899,7 +4990,6 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
4899
4990
ok = end_session_sync (Session ),
4900
4991
ok = amqp10_client :close_connection (Connection ).
4901
4992
4902
-
4903
4993
% % internal
4904
4994
% %
4905
4995
0 commit comments