Skip to content

Commit e6587c6

Browse files
committed
Support consumer priority in AMQP
Arguments * `rabbitmq:stream-offset-spec`, * `rabbitmq:stream-filter`, * `rabbitmq:stream-match-unfiltered` are set in the `filter` field of the `Source`. This makes sense for these consumer arguments because: > A filter acts as a function on a message which returns a boolean result > indicating whether the message can pass through that filter or not. Consumer priority is not really such a predicate. Therefore, it makes more sense to set consumer priority in the `properties` field of the `Attach` frame. We call the key `rabbitmq:priority` which maps to consumer argument `x-priority`. While AMQP 0.9.1 consumers are allowed to set any integer data type for the priority level, this commit decides to enforce an `int` value (range -(2^31) to 2^31 - 1 inclusive). Consumer priority levels outside of this range are not needed in practice.
1 parent f20f5be commit e6587c6

File tree

4 files changed

+123
-17
lines changed

4 files changed

+123
-17
lines changed

deps/amqp10_client/src/amqp10_client_types.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ utf8(B) when is_binary(B) -> {utf8, B}.
8282
uint(N) -> {uint, N}.
8383

8484
make_properties(#{properties := Props})
85-
when is_map(Props) andalso
86-
map_size(Props) > 0 ->
85+
when map_size(Props) > 0 ->
8786
{map, maps:fold(fun(K, V, L) ->
8887
[{{symbol, K}, V} | L]
8988
end, [], Props)};

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
10861086
mode => Mode,
10871087
consumer_tag => handle_to_ctag(HandleInt),
10881088
exclusive_consume => false,
1089-
args => source_filters_to_consumer_args(Source),
1089+
args => consumer_arguments(Attach),
10901090
ok_msg => undefined,
10911091
acting_user => Username},
10921092
case rabbit_queue_type:consume(Q, Spec, QStates0) of
@@ -2852,19 +2852,36 @@ encode_frames(T, Msg, MaxPayloadSize, Transfers) ->
28522852
lists:reverse([[T, Msg] | Transfers])
28532853
end.
28542854

2855-
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
2856-
source_filters_to_consumer_args(
2855+
consumer_arguments(#'v1_0.attach'{
2856+
source = #'v1_0.source'{filter = Filter},
2857+
properties = Properties}) ->
2858+
properties_to_consumer_args(Properties) ++
2859+
filter_to_consumer_args(Filter).
2860+
2861+
properties_to_consumer_args({map, KVList}) ->
2862+
Key = {symbol, <<"rabbitmq:priority">>},
2863+
case proplists:lookup(Key, KVList) of
2864+
{Key, Val = {int, _Prio}} ->
2865+
[mc_amqpl:to_091(<<"x-priority">>, Val)];
2866+
_ ->
2867+
[]
2868+
end;
2869+
properties_to_consumer_args(_) ->
2870+
[].
2871+
2872+
filter_to_consumer_args({map, KVList}) ->
2873+
filter_to_consumer_args(
28572874
[<<"rabbitmq:stream-offset-spec">>,
28582875
<<"rabbitmq:stream-filter">>,
28592876
<<"rabbitmq:stream-match-unfiltered">>],
28602877
KVList,
28612878
[]);
2862-
source_filters_to_consumer_args(_Source) ->
2879+
filter_to_consumer_args(_) ->
28632880
[].
28642881

2865-
source_filters_to_consumer_args([], _KVList, Acc) ->
2882+
filter_to_consumer_args([], _KVList, Acc) ->
28662883
Acc;
2867-
source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) ->
2884+
filter_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) ->
28682885
Key = {symbol, H},
28692886
Arg = case keyfind_unpack_described(Key, KVList) of
28702887
{_, {timestamp, Ts}} ->
@@ -2876,8 +2893,8 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVL
28762893
_ ->
28772894
[]
28782895
end,
2879-
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
2880-
source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
2896+
filter_to_consumer_args(T, KVList, Arg ++ Acc);
2897+
filter_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
28812898
Key = {symbol, H},
28822899
Arg = case keyfind_unpack_described(Key, KVList) of
28832900
{_, {list, Filters0}} when is_list(Filters0) ->
@@ -2892,18 +2909,18 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList,
28922909
_ ->
28932910
[]
28942911
end,
2895-
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
2896-
source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) ->
2912+
filter_to_consumer_args(T, KVList, Arg ++ Acc);
2913+
filter_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) ->
28972914
Key = {symbol, H},
28982915
Arg = case keyfind_unpack_described(Key, KVList) of
28992916
{_, MU} when is_boolean(MU) ->
29002917
[{<<"x-stream-match-unfiltered">>, bool, MU}];
29012918
_ ->
29022919
[]
29032920
end,
2904-
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
2905-
source_filters_to_consumer_args([_ | T], KVList, Acc) ->
2906-
source_filters_to_consumer_args(T, KVList, Acc).
2921+
filter_to_consumer_args(T, KVList, Arg ++ Acc);
2922+
filter_to_consumer_args([_ | T], KVList, Acc) ->
2923+
filter_to_consumer_args(T, KVList, Acc).
29072924

29082925
keyfind_unpack_described(Key, KvList) ->
29092926
%% filterset values _should_ be described values

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@
110110
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
111111
credit_mode :: credit_mode(), % part of snapshot data
112112
lifetime = once :: once | auto,
113-
priority = 0 :: non_neg_integer()}).
113+
priority = 0 :: integer()}).
114114

115115
-record(consumer,
116116
{cfg = #consumer_cfg{},

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ groups() ->
8181
stop_classic_queue,
8282
stop_quorum_queue,
8383
stop_stream,
84+
consumer_priority_classic_queue,
85+
consumer_priority_quorum_queue,
8486
single_active_consumer_classic_queue,
8587
single_active_consumer_quorum_queue,
8688
detach_requeues_one_session_classic_queue,
@@ -1841,6 +1843,95 @@ stop(QType, Config) ->
18411843
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
18421844
ok = rabbit_ct_client_helpers:close_channel(Ch).
18431845

1846+
consumer_priority_classic_queue(Config) ->
1847+
consumer_priority(<<"classic">>, Config).
1848+
1849+
consumer_priority_quorum_queue(Config) ->
1850+
consumer_priority(<<"quorum">>, Config).
1851+
1852+
consumer_priority(QType, Config) ->
1853+
QName = atom_to_binary(?FUNCTION_NAME),
1854+
{Connection, Session, LinkPair} = init(Config),
1855+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
1856+
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
1857+
1858+
Address = rabbitmq_amqp_address:queue(QName),
1859+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
1860+
ok = wait_for_credit(Sender),
1861+
1862+
%% We test what our RabbitMQ docs state:
1863+
%% "Consumers which do not specify a value have priority 0.
1864+
%% Larger numbers indicate higher priority, and both positive and negative numbers can be used."
1865+
{ok, ReceiverDefaultPrio} = amqp10_client:attach_receiver_link(
1866+
Session,
1867+
<<"default prio consumer">>,
1868+
Address,
1869+
unsettled),
1870+
{ok, ReceiverHighPrio} = amqp10_client:attach_receiver_link(
1871+
Session,
1872+
<<"high prio consumer">>,
1873+
Address,
1874+
unsettled,
1875+
none,
1876+
#{},
1877+
#{<<"rabbitmq:priority">> => {int, 2_000_000_000}}),
1878+
{ok, ReceiverLowPrio} = amqp10_client:attach_receiver_link(
1879+
Session,
1880+
<<"low prio consumer">>,
1881+
Address,
1882+
unsettled,
1883+
none,
1884+
#{},
1885+
#{<<"rabbitmq:priority">> => {int, -2_000_000_000}}),
1886+
ok = amqp10_client:flow_link_credit(ReceiverDefaultPrio, 1, never),
1887+
ok = amqp10_client:flow_link_credit(ReceiverHighPrio, 2, never),
1888+
ok = amqp10_client:flow_link_credit(ReceiverLowPrio, 1, never),
1889+
1890+
NumMsgs = 5,
1891+
[begin
1892+
Bin = integer_to_binary(N),
1893+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin))
1894+
end || N <- lists:seq(1, NumMsgs)],
1895+
ok = wait_for_accepts(NumMsgs),
1896+
1897+
receive {amqp10_msg, Rec1, Msg1} ->
1898+
?assertEqual(<<"1">>, amqp10_msg:body_bin(Msg1)),
1899+
?assertEqual(ReceiverHighPrio, Rec1),
1900+
ok = amqp10_client:accept_msg(Rec1, Msg1)
1901+
after 5000 -> ct:fail({missing_msg, ?LINE})
1902+
end,
1903+
receive {amqp10_msg, Rec2, Msg2} ->
1904+
?assertEqual(<<"2">>, amqp10_msg:body_bin(Msg2)),
1905+
?assertEqual(ReceiverHighPrio, Rec2),
1906+
ok = amqp10_client:accept_msg(Rec2, Msg2)
1907+
after 5000 -> ct:fail({missing_msg, ?LINE})
1908+
end,
1909+
receive {amqp10_msg, Rec3, Msg3} ->
1910+
?assertEqual(<<"3">>, amqp10_msg:body_bin(Msg3)),
1911+
?assertEqual(ReceiverDefaultPrio, Rec3),
1912+
ok = amqp10_client:accept_msg(Rec3, Msg3)
1913+
after 5000 -> ct:fail({missing_msg, ?LINE})
1914+
end,
1915+
receive {amqp10_msg, Rec4, Msg4} ->
1916+
?assertEqual(<<"4">>, amqp10_msg:body_bin(Msg4)),
1917+
?assertEqual(ReceiverLowPrio, Rec4),
1918+
ok = amqp10_client:accept_msg(Rec4, Msg4)
1919+
after 5000 -> ct:fail({missing_msg, ?LINE})
1920+
end,
1921+
receive {amqp10_msg, _, _} = Unexpected ->
1922+
ct:fail({unexpected_msg, Unexpected, ?LINE})
1923+
after 5 -> ok
1924+
end,
1925+
1926+
ok = amqp10_client:detach_link(Sender),
1927+
ok = amqp10_client:detach_link(ReceiverDefaultPrio),
1928+
ok = amqp10_client:detach_link(ReceiverHighPrio),
1929+
ok = amqp10_client:detach_link(ReceiverLowPrio),
1930+
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
1931+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
1932+
ok = end_session_sync(Session),
1933+
ok = amqp10_client:close_connection(Connection).
1934+
18441935
single_active_consumer_classic_queue(Config) ->
18451936
single_active_consumer(<<"classic">>, Config).
18461937

@@ -4899,7 +4990,6 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
48994990
ok = end_session_sync(Session),
49004991
ok = amqp10_client:close_connection(Connection).
49014992

4902-
49034993
%% internal
49044994
%%
49054995

0 commit comments

Comments
 (0)