Skip to content

Commit 250a8f5

Browse files
ikavgomichaelklishin
authored andcommitted
RMQ-1263: Check if queue protected from deleted inside rabbit_amqqueue:with_delete
Delayed exchange automatically manages associated Delayed Queue. We don't want users to delete it accidentally. If queue is indeed protected its removal can be forced by calling with ?INTERNAL_USER as ActingUser. (cherry picked from commit d61c3a899dcc719506c3d7249768ec0887e34d23)
1 parent 3eeb8f9 commit 250a8f5

File tree

2 files changed

+71
-10
lines changed

2 files changed

+71
-10
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
is_exclusive/1,
6262
is_classic/1,
6363
is_quorum/1,
64+
is_internal/1,
65+
internal_owner/1,
66+
make_internal/1,
67+
make_internal/2,
6468
pattern_match_all/0,
6569
pattern_match_on_name/1,
6670
pattern_match_on_type/1,
@@ -78,6 +82,8 @@
7882
-define(is_backwards_compat_classic(T),
7983
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
8084

85+
-type amqqueue_options() :: map() | ets:match_pattern().
86+
8187
-record(amqqueue, {
8288
%% immutable
8389
name :: rabbit_amqqueue:name() | ets:match_pattern(),
@@ -108,7 +114,7 @@
108114
slave_pids_pending_shutdown = [], %% reserved
109115
%% secondary index
110116
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
111-
options = #{} :: map() | ets:match_pattern(),
117+
options = #{} :: amqqueue_options(),
112118
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
113119
type_state = #{} :: map() | ets:match_pattern()
114120
}).
@@ -351,6 +357,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
351357
set_arguments(#amqqueue{} = Queue, Args) ->
352358
Queue#amqqueue{arguments = Args}.
353359

360+
% options
361+
362+
-spec get_options(amqqueue()) -> amqqueue_options().
363+
364+
get_options(#amqqueue{options = Options}) ->
365+
Options.
366+
367+
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
368+
369+
set_options(#amqqueue{} = Queue, Options) ->
370+
Queue#amqqueue{options = Options}.
371+
372+
354373
% decorators
355374

356375
-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
@@ -395,15 +414,6 @@ get_name(#amqqueue{name = Name}) -> Name.
395414
set_name(#amqqueue{} = Queue, Name) ->
396415
Queue#amqqueue{name = Name}.
397416

398-
-spec get_options(amqqueue()) -> map().
399-
400-
get_options(#amqqueue{options = Options}) -> Options.
401-
402-
-spec set_options(amqqueue(), map()) -> amqqueue().
403-
404-
set_options(#amqqueue{} = Queue, Options) ->
405-
Queue#amqqueue{options = Options}.
406-
407417
% pid
408418

409419
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
@@ -497,6 +507,27 @@ is_classic(Queue) ->
497507
is_quorum(Queue) ->
498508
get_type(Queue) =:= rabbit_quorum_queue.
499509

510+
-spec is_internal(amqqueue()) -> boolean().
511+
512+
is_internal(#amqqueue{options = #{internal := true}}) -> true;
513+
is_internal(#amqqueue{}) -> false.
514+
515+
-spec internal_owner(amqqueue()) -> #resource{}.
516+
517+
internal_owner(#amqqueue{options = #{internal := true,
518+
internal_owner := IOwner}}) ->
519+
IOwner;
520+
internal_owner(#amqqueue{}) ->
521+
undefined.
522+
523+
make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
524+
Q#amqqueue{options = maps:merge(Options, #{internal => true,
525+
internal_owner => undefined})}.
526+
make_internal(Q = #amqqueue{options = Options}, Owner)
527+
when is_map(Options) andalso is_record(Owner, resource) ->
528+
Q#amqqueue{options = maps:merge(Options, #{internal => true,
529+
interna_owner => Owner})}.
530+
500531
fields() ->
501532
fields(?record_version).
502533

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
811811
"match that of the original declaration.",
812812
[rabbit_misc:rs(QueueName)]).
813813

814+
-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
815+
'ok' | rabbit_types:channel_exit().
816+
check_internal(Q, Username) ->
817+
case amqqueue:is_internal(Q) of
818+
true ->
819+
case Username of
820+
%% note cli delete command uses "cli_user"
821+
?INTERNAL_USER ->
822+
ok;
823+
_ ->
824+
QueueName = amqqueue:get_name(Q),
825+
case amqqueue:internal_owner(Q) of
826+
undefined ->
827+
rabbit_misc:protocol_error(
828+
resource_locked,
829+
"Cannot delete protected ~ts.",
830+
[rabbit_misc:rs(QueueName)]);
831+
IOwner ->
832+
rabbit_misc:protocol_error(
833+
resource_locked,
834+
"Cannot delete protected ~ts. It was "
835+
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
836+
[rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
837+
end
838+
end;
839+
false ->
840+
ok
841+
end.
842+
814843
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
815844
A | rabbit_types:channel_exit().
816845
with_exclusive_access_or_die(Name, ReaderPid, F) ->
@@ -1681,6 +1710,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
16811710
case with(
16821711
QueueName,
16831712
fun (Q) ->
1713+
ok = check_internal(Q, Username),
16841714
if CheckExclusive ->
16851715
check_exclusive_access(Q, ConnPid);
16861716
true ->

0 commit comments

Comments
 (0)