Skip to content

Commit 31c6a07

Browse files
committed
STOMP: add support for consumer priorities
x-priority header allows to specify the consumer priority
1 parent 194d4ba commit 31c6a07

File tree

3 files changed

+52
-1
lines changed

3 files changed

+52
-1
lines changed

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
3131
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
3232
-define(HEADER_PRIORITY, "priority").
33+
-define(HEADER_X_PRIORITY, "x-priority").
3334
-define(HEADER_RECEIPT, "receipt").
3435
-define(HEADER_REDELIVERED, "redelivered").
3536
-define(HEADER_REPLY_TO, "reply-to").

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,8 @@ do_subscribe(Destination, DestHdr, Frame,
718718
subscribe_arguments(Frame) ->
719719
subscribe_arguments([?HEADER_X_STREAM_OFFSET,
720720
?HEADER_X_STREAM_FILTER,
721-
?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []).
721+
?HEADER_X_STREAM_MATCH_UNFILTERED,
722+
?HEADER_X_PRIORITY], Frame, []).
722723

723724
subscribe_arguments([], _Frame , Acc) ->
724725
Acc;
@@ -749,6 +750,14 @@ subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) ->
749750
[{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc;
750751
not_found ->
751752
Acc
753+
end;
754+
subscribe_argument(?HEADER_X_PRIORITY, Frame, Acc) ->
755+
Priority = rabbit_stomp_frame:integer_header(Frame, ?HEADER_X_PRIORITY),
756+
case Priority of
757+
{ok, P} ->
758+
[{list_to_binary(?HEADER_X_PRIORITY), byte, P}] ++ Acc;
759+
not_found ->
760+
Acc
752761
end.
753762

754763
check_subscription_access(Destination = {topic, _Topic},

deps/rabbitmq_stomp/test/system_SUITE.erl

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
-include("rabbit_stomp_headers.hrl").
1818

1919
-define(QUEUE, <<"TestQueue">>).
20+
-define(QUEUE_QQ, <<"TestQueueQQ">>).
2021
-define(DESTINATION, "/amq/queue/TestQueue").
22+
-define(DESTINATION_QQ, "/amq/queue/TestQueueQQ").
2123

2224
all() ->
2325
[{group, version_to_group_name(V)} || V <- ?SUPPORTED_VERSIONS].
@@ -28,6 +30,7 @@ groups() ->
2830
publish_unauthorized_error,
2931
subscribe_error,
3032
subscribe,
33+
subscribe_with_x_priority,
3134
unsubscribe_ack,
3235
subscribe_ack,
3336
send,
@@ -161,6 +164,44 @@ subscribe(Config) ->
161164
{ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
162165
ok.
163166

167+
subscribe_with_x_priority(Config) ->
168+
Version = ?config(version, Config),
169+
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
170+
Channel = ?config(amqp_channel, Config),
171+
ClientA = ?config(stomp_client, Config),
172+
#'queue.declare_ok'{} =
173+
amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE_QQ,
174+
durable = true,
175+
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
176+
{<<"x-single-active-consumer">>, bool, true}
177+
]}),
178+
179+
%% subscribe and wait for receipt
180+
rabbit_stomp_client:send(
181+
ClientA, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ}, {"receipt", "foo"}]),
182+
{ok, _ClientA1, _, _} = stomp_receive(ClientA, "RECEIPT"),
183+
184+
%% subscribe with a higher priority and wait for receipt
185+
{ok, ClientB} = rabbit_stomp_client:connect(Version, StompPort),
186+
rabbit_stomp_client:send(
187+
ClientB, "SUBSCRIBE", [{"destination", ?DESTINATION_QQ},
188+
{"receipt", "foo"},
189+
{"x-priority", 10}
190+
]),
191+
{ok, ClientB1, _, _} = stomp_receive(ClientB, "RECEIPT"),
192+
193+
%% send from amqp
194+
Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE_QQ},
195+
196+
amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
197+
payload = <<"hello">>}),
198+
199+
%% ClientB should receive the message since it has a higher priority
200+
{ok, _ClientB2, _, [<<"hello">>]} = stomp_receive(ClientB1, "MESSAGE"),
201+
#'queue.delete_ok'{} =
202+
amqp_channel:call(Channel, #'queue.delete'{queue = ?QUEUE_QQ}),
203+
ok.
204+
164205
unsubscribe_ack(Config) ->
165206
Channel = ?config(amqp_channel, Config),
166207
Client = ?config(stomp_client, Config),

0 commit comments

Comments
 (0)