Skip to content

Commit 9bbfcdb

Browse files
committed
Bugfix: check unsupported policies on queue.declare
The amqqueue record does not exist when `rabbit_amqqueue:is_policy_applicable/2` is called. It is available in `rabbit_policy`, so let's send it through.
1 parent 35ef91d commit 9bbfcdb

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,8 @@ policy_changed(Q1, Q2) ->
362362
%% mirroring-related has changed - the policy may have changed anyway.
363363
notify_policy_changed(Q2).
364364

365+
is_policy_applicable(Q, Policy) when ?is_amqqueue(Q) ->
366+
rabbit_queue_type:is_policy_applicable(Q, Policy);
365367
is_policy_applicable(QName, Policy) ->
366368
case lookup(QName) of
367369
{ok, Q} ->

deps/rabbit/src/rabbit_policy.erl

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,14 @@ merge_operator_definitions(Policy, OpPolicy) ->
9393
lists:umerge(Keys, OpKeys)).
9494

9595
set(Q0) when ?is_amqqueue(Q0) ->
96-
Name = amqqueue:get_name(Q0),
97-
Policy = match(Name),
98-
OpPolicy = match_op(Name),
96+
%% On queue.declare the queue record doesn't exist yet, so the later lookup in
97+
%% `rabbit_amqqueue:is_policy_applicable` fails. Thus, let's send the whole record
98+
%% through the match functions. `match_all` still needs to support resources only,
99+
%% as that is used by some plugins. Maybe `match` too, difficult to figure out
100+
%% what is public API and what is not, so let's support both `amqqueue` and `resource`
101+
%% records as arguments.
102+
Policy = match(Q0),
103+
OpPolicy = match_op(Q0),
99104
Q1 = amqqueue:set_policy(Q0, Policy),
100105
Q2 = amqqueue:set_operator_policy(Q1, OpPolicy),
101106
Q2;
@@ -147,9 +152,15 @@ list_formatted_op(VHost, Ref, AggregatorPid) ->
147152
rabbit_control_misc:emitting_map(AggregatorPid, Ref,
148153
fun(P) -> P end, list_formatted_op(VHost)).
149154

155+
match(Q) when ?is_amqqueue(Q) ->
156+
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
157+
match(Q, list(VHost));
150158
match(Name = #resource{virtual_host = VHost}) ->
151159
match(Name, list(VHost)).
152160

161+
match_op(Q) when ?is_amqqueue(Q) ->
162+
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
163+
match(Q, list_op(VHost));
153164
match_op(Name = #resource{virtual_host = VHost}) ->
154165
match(Name, list_op(VHost)).
155166

@@ -172,15 +183,21 @@ get(Name, EntityName = #resource{virtual_host = VHost}) ->
172183
match(EntityName, list(VHost)),
173184
match(EntityName, list_op(VHost))).
174185

175-
match(Name, Policies) ->
176-
case match_all(Name, Policies) of
186+
match(NameOrQueue, Policies) ->
187+
case match_all(NameOrQueue, Policies) of
177188
[] -> undefined;
178189
[Policy | _] -> Policy
179190
end.
180191

181-
match_all(Name, Policies) ->
182-
lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(Name, P)]).
192+
match_all(NameOrQueue, Policies) ->
193+
lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(NameOrQueue, P)]).
183194

195+
matches(Q, Policy) when ?is_amqqueue(Q) ->
196+
#resource{name = Name, kind = Kind, virtual_host = VHost} = amqqueue:get_name(Q),
197+
matches_type(Kind, pget('apply-to', Policy)) andalso
198+
is_applicable(Q, pget(definition, Policy)) andalso
199+
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
200+
VHost =:= pget(vhost, Policy);
184201
matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) ->
185202
matches_type(Kind, pget('apply-to', Policy)) andalso
186203
is_applicable(Resource, pget(definition, Policy)) andalso
@@ -527,6 +544,8 @@ matches_type(_, _) -> false.
527544

528545
priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
529546

547+
is_applicable(Q, Policy) when ?is_amqqueue(Q) ->
548+
rabbit_amqqueue:is_policy_applicable(Q, rabbit_data_coercion:to_list(Policy));
530549
is_applicable(#resource{kind = queue} = Resource, Policy) ->
531550
rabbit_amqqueue:is_policy_applicable(Resource, rabbit_data_coercion:to_list(Policy));
532551
is_applicable(_, _) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ all_tests() ->
124124
purge,
125125
consumer_metrics,
126126
invalid_policy,
127+
pre_existing_invalid_policy,
127128
delete_if_empty,
128129
delete_if_unused,
129130
queue_ttl,
@@ -963,6 +964,22 @@ invalid_policy(Config) ->
963964
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
964965
ok.
965966

967+
pre_existing_invalid_policy(Config) ->
968+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
969+
970+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
971+
QQ = ?config(queue_name, Config),
972+
ok = rabbit_ct_broker_helpers:set_policy(
973+
Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>,
974+
[{<<"ha-mode">>, <<"all">>}]),
975+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
976+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
977+
Info = rpc:call(Server, rabbit_quorum_queue, infos,
978+
[rabbit_misc:r(<<"/">>, queue, QQ)]),
979+
?assertEqual('', proplists:get_value(policy, Info)),
980+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
981+
ok.
982+
966983
dead_letter_to_quorum_queue(Config) ->
967984
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
968985

0 commit comments

Comments
 (0)