Skip to content

Commit 73e6176

Browse files
Merge pull request #11541 from rabbitmq/mk-rabbit_queue-virtual-host-default-aware
Make 'queue.declare' aware of virtual host DQT at validation time (cherry picked from commit 5de87aa) Conflicts: deps/rabbit/src/rabbit_queue_type.erl deps/rabbit/src/rabbit_vhost.erl
1 parent 209f91f commit 73e6176

File tree

6 files changed

+130
-23
lines changed

6 files changed

+130
-23
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
-export([is_server_named_allowed/1]).
6868

6969
-export([check_max_age/1]).
70-
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
70+
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
7171

7272
-export([deactivate_limit_all/2]).
7373

@@ -224,8 +224,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
224224
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
225225
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
226226
Owner, ActingUser, Node) ->
227-
ok = check_declare_arguments(QueueName, Args),
228-
Type = get_queue_type(Args),
227+
%% note: this is a module name, not a shortcut such as <<"quorum">>
228+
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
229+
ok = check_declare_arguments(QueueName, Args, DQT),
230+
Type = get_queue_type(Args, DQT),
229231
case rabbit_queue_type:is_enabled(Type) of
230232
true ->
231233
Q = amqqueue:new(QueueName,
@@ -252,10 +254,25 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
252254
[rabbit_misc:rs(QueueName), Type, Node]}
253255
end.
254256

257+
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
258+
%% This version is not virtual host metadata-aware but will use
259+
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
260+
get_queue_type([]) ->
261+
rabbit_queue_type:default();
255262
get_queue_type(Args) ->
263+
get_queue_type(Args, rabbit_queue_type:default()).
264+
265+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
266+
get_queue_type([], DefaultQueueType) ->
267+
rabbit_queue_type:discover(DefaultQueueType);
268+
get_queue_type(Args, DefaultQueueType) ->
256269
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
257270
undefined ->
258-
rabbit_queue_type:default();
271+
rabbit_queue_type:discover(DefaultQueueType);
272+
{longstr, undefined} ->
273+
rabbit_queue_type:discover(DefaultQueueType);
274+
{longstr, <<"undefined">>} ->
275+
rabbit_queue_type:discover(DefaultQueueType);
259276
{_, V} ->
260277
rabbit_queue_type:discover(V)
261278
end.
@@ -756,15 +773,20 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
756773
case IsPermitted andalso IsCompatible of
757774
true ->
758775
%% patch up declare arguments with x-queue-type if there
759-
%% is a vhost default set the queue is druable and not exclusive
776+
%% is a vhost default set the queue is durable and not exclusive
760777
%% and there is no queue type argument
761778
%% present
762779
rabbit_misc:set_table_value(Args0,
763780
<<"x-queue-type">>,
764781
longstr,
765782
DefaultQueueType);
766783
false ->
767-
Args0
784+
%% if the properties are incompatible with the declared
785+
%% DQT, use the fall back type
786+
rabbit_misc:set_table_value(Args0,
787+
<<"x-queue-type">>,
788+
longstr,
789+
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
768790
end;
769791
_ ->
770792
Args0
@@ -804,7 +826,33 @@ assert_args_equivalence(Q, NewArgs) ->
804826
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
805827
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).
806828

807-
check_declare_arguments(QueueName, Args) ->
829+
-spec maybe_inject_default_queue_type_shortcut_into_args(
830+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
831+
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
832+
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
833+
undefined ->
834+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
835+
{longstr, undefined} ->
836+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
837+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
838+
{longstr, <<"undefined">>} ->
839+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
840+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
841+
_ValueIsAlreadySet ->
842+
Args0
843+
end.
844+
845+
-spec inject_default_queue_type_shortcut_into_args(
846+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
847+
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
848+
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
849+
NewVal = rabbit_data_coercion:to_binary(Shortcut),
850+
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).
851+
852+
check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
853+
%% If the x-queue-type was not provided by the client, inject the
854+
%% (virtual host, global or fallback) default before performing validation. MK.
855+
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
808856
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
809857
Type = get_queue_type(Args),
810858
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ priorities(Q) when ?is_amqqueue(Q) ->
130130
case lists:member(Type, Ints) of
131131
false -> none;
132132
true ->
133-
Max = min(RequestedMax, ?MAX_SUPPORTED_PRIORITY),
133+
%% make sure the value is no greater than ?MAX_SUPPORTED_PRIORITY but
134+
%% also is not negative
135+
Max = max(1, min(RequestedMax, ?MAX_SUPPORTED_PRIORITY)),
134136
lists:reverse(lists:seq(0, Max))
135137
end;
136138
_ -> none

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
init/0,
1818
close/1,
1919
discover/1,
20+
short_alias_of/1,
2021
feature_flag_name/1,
2122
default/0,
23+
fallback/0,
2224
is_enabled/1,
2325
is_compatible/4,
2426
declare/2,
@@ -66,7 +68,7 @@
6668
-type queue_state() :: term().
6769
-type msg_tag() :: term().
6870
-type arguments() :: queue_arguments | consumer_arguments.
69-
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
71+
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
7072

7173
-export_type([queue_type/0]).
7274

@@ -239,20 +241,49 @@
239241
-callback notify_decorators(amqqueue:amqqueue()) ->
240242
ok.
241243

244+
-spec discover(binary() | atom()) -> queue_type().
245+
discover(<<"undefined">>) ->
246+
fallback();
247+
discover(undefined) ->
248+
fallback();
242249
%% TODO: should this use a registry that's populated on boot?
243250
discover(<<"quorum">>) ->
244251
rabbit_quorum_queue;
252+
discover(rabbit_quorum_queue) ->
253+
rabbit_quorum_queue;
245254
discover(<<"classic">>) ->
246255
rabbit_classic_queue;
256+
discover(rabbit_classic_queue) ->
257+
rabbit_classic_queue;
258+
discover(rabbit_stream_queue) ->
259+
rabbit_stream_queue;
247260
discover(<<"stream">>) ->
248261
rabbit_stream_queue;
249262
discover(Other) when is_atom(Other) ->
250263
discover(rabbit_data_coercion:to_binary(Other));
251264
discover(Other) when is_binary(Other) ->
252265
T = rabbit_registry:binary_to_type(Other),
266+
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
253267
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
254268
Mod.
255269

270+
-spec short_alias_of(queue_type()) -> binary().
271+
%% The opposite of discover/1: returns a short alias given a module name
272+
short_alias_of(<<"rabbit_quorum_queue">>) ->
273+
<<"quorum">>;
274+
short_alias_of(rabbit_quorum_queue) ->
275+
<<"quorum">>;
276+
short_alias_of(<<"rabbit_classic_queue">>) ->
277+
<<"classic">>;
278+
short_alias_of(rabbit_classic_queue) ->
279+
<<"classic">>;
280+
short_alias_of(<<"rabbit_stream_queue">>) ->
281+
<<"stream">>;
282+
short_alias_of(rabbit_stream_queue) ->
283+
<<"stream">>;
284+
short_alias_of(_Other) ->
285+
undefined.
286+
256287
feature_flag_name(<<"quorum">>) ->
257288
quorum_queue;
258289
feature_flag_name(<<"classic">>) ->
@@ -262,10 +293,19 @@ feature_flag_name(<<"stream">>) ->
262293
feature_flag_name(_) ->
263294
undefined.
264295

296+
%% If the client does not specify the type, the virtual host does not have any
297+
%% metadata default, and rabbit.default_queue_type is not set in the application env,
298+
%% use this type as the last resort.
299+
-spec fallback() -> queue_type().
300+
fallback() ->
301+
rabbit_classic_queue.
302+
303+
-spec default() -> queue_type().
265304
default() ->
266-
rabbit_misc:get_env(rabbit,
267-
default_queue_type,
268-
rabbit_classic_queue).
305+
V = rabbit_misc:get_env(rabbit,
306+
default_queue_type,
307+
fallback()),
308+
rabbit_data_coercion:to_atom(V).
269309

270310
%% is a specific queue type implementation enabled
271311
-spec is_enabled(module()) -> boolean().

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, msg_store_dir_base/0, config_file_path/1, ensure_config_file/1]).
2323
-export([delete_storage/1]).
2424
-export([vhost_down/1]).
25-
-export([put_vhost/5,
26-
put_vhost/6]).
25+
-export([put_vhost/5, put_vhost/6]).
26+
-export([default_queue_type/1, default_queue_type/2]).
2727

2828
%%
2929
%% API
@@ -503,6 +503,22 @@ default_name() ->
503503
undefined -> <<"/">>
504504
end.
505505

506+
-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
507+
default_queue_type(VirtualHost) ->
508+
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
509+
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
510+
default_queue_type(VirtualHost, FallbackQueueType) ->
511+
case exists(VirtualHost) of
512+
false -> FallbackQueueType;
513+
true ->
514+
Record = lookup(VirtualHost),
515+
case vhost:get_default_queue_type(Record) of
516+
undefined -> FallbackQueueType;
517+
<<"undefined">> -> FallbackQueueType;
518+
Type -> Type
519+
end
520+
end.
521+
506522
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
507523
lookup(VHostName) ->
508524
case rabbit_db_vhost:get(VHostName) of

deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ all_definitions(ReqData, Context) ->
6363
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
6464
{product_name, rabbit_data_coercion:to_binary(ProductName)},
6565
{product_version, rabbit_data_coercion:to_binary(ProductVersion)}] ++
66-
filter(
66+
retain_whitelisted(
6767
[{users, rabbit_mgmt_wm_users:users(all)},
6868
{vhosts, rabbit_mgmt_wm_vhosts:basic()},
6969
{permissions, rabbit_mgmt_wm_permissions:permissions()},
@@ -112,7 +112,7 @@ vhost_definitions(ReqData, VHost, Context) ->
112112
|| P <- rabbit_runtime_parameters:list(VHost)],
113113
rabbit_mgmt_util:reply(
114114
[{rabbit_version, rabbit_data_coercion:to_binary(Vsn)}] ++
115-
filter(
115+
retain_whitelisted(
116116
[{parameters, Parameters},
117117
{policies, [strip_vhost(P) || P <- rabbit_mgmt_wm_policies:basic(ReqData)]},
118118
{queues, Qs},
@@ -266,14 +266,14 @@ rw_state() ->
266266
{bindings, [source, vhost, destination, destination_type, routing_key,
267267
arguments]}].
268268

269-
filter(Items) ->
270-
[filter_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].
269+
retain_whitelisted(Items) ->
270+
[retain_whitelisted_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].
271271

272-
filter_items(Name, List, Allowed) ->
273-
{Name, [filter_item(I, Allowed) || I <- List]}.
272+
retain_whitelisted_items(Name, List, Allowed) ->
273+
{Name, [only_whitelisted_for_item(I, Allowed) || I <- List]}.
274274

275-
filter_item(Item, Allowed) ->
276-
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed)].
275+
only_whitelisted_for_item(Item, Allowed) ->
276+
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed), Fact =/= undefined].
277277

278278
strip_vhost(Item) ->
279279
lists:keydelete(vhost, 1, Item).

deps/rabbitmq_management/test/stats_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(stats_SUITE).
99

1010
-include_lib("proper/include/proper.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1213

1314
-compile(export_all).
@@ -175,4 +176,4 @@ format_range_constant(_Config) ->
175176
SamplesFun),
176177
5 = proplists:get_value(publish, Got),
177178
PD = proplists:get_value(publish_details, Got),
178-
0.0 = proplists:get_value(rate, PD).
179+
?assertEqual(0.0, proplists:get_value(rate, PD)).

0 commit comments

Comments
 (0)