Skip to content

Commit f5aa1fb

Browse files
Take policy-configured max-priority into account
Part of #1590.
1 parent 08636f9 commit f5aa1fb

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ process_args_policy(State = #q{q = Q,
382382
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
383383
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
384384
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
385+
{<<"max-priority">>, fun res_arg/2, fun init_max_priority/2},
385386
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
386387
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
387388
drop_expired_msgs(
@@ -426,6 +427,9 @@ init_max_bytes(MaxBytes, State) ->
426427
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
427428
State1.
428429

430+
init_max_priority(_MaxPriority, State) ->
431+
State.
432+
429433
init_overflow(undefined, State) ->
430434
State;
431435
init_overflow(Overflow, State) ->

src/rabbit_priority_queue.erl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
{requires, pre_boot},
2929
{enables, kernel_ready}]}).
3030

31+
-import(rabbit_misc, [pget/2]).
32+
3133
-export([enable/0]).
3234

3335
-export([start/2, stop/1]).
@@ -43,6 +45,8 @@
4345
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
4446
zip_msgs_and_acks/4, handle_info/2]).
4547

48+
-export([max_priority/1, priorities/1]).
49+
4650
-record(state, {bq, bqss, max_priority}).
4751
-record(passthrough, {bq, bqs}).
4852

@@ -125,9 +129,19 @@ collapse_recovery(QNames, DupNames, Recovery) ->
125129
end, dict:new(), lists:zip(DupNames, Recovery)),
126130
[dict:fetch(Name, NameToTerms) || Name <- QNames].
127131

128-
priorities(#amqqueue{arguments = Args}) ->
129-
Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint],
132+
max_priority(Q = #amqqueue{arguments = Args}) ->
130133
case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of
134+
{Type, RequestedMax} -> {Type, RequestedMax};
135+
undefined ->
136+
case rabbit_policy:effective_definition(Q) of
137+
undefined -> undefined;
138+
Proplist -> {unsignedbyte, pget(<<"max-priority">>, Proplist)}
139+
end
140+
end.
141+
142+
priorities(Q) ->
143+
Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint],
144+
case max_priority(Q) of
131145
{Type, RequestedMax} ->
132146
case lists:member(Type, Ints) of
133147
false -> none;

0 commit comments

Comments
 (0)