Skip to content

Commit 7ba0594

Browse files
Merge pull request #9324 from Ayanda-D/robust-ctl-delete-queue
Make rabbitmqctl delete_queue more robust
2 parents d50e318 + 779c9e4 commit 7ba0594

File tree

12 files changed

+290
-140
lines changed

12 files changed

+290
-140
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"):
4747
"src/rabbit_access_control.erl",
4848
"src/rabbit_alarm.erl",
4949
"src/rabbit_amqqueue.erl",
50+
"src/rabbit_amqqueue_control.erl",
5051
"src/rabbit_amqqueue_process.erl",
5152
"src/rabbit_amqqueue_sup.erl",
5253
"src/rabbit_amqqueue_sup_sup.erl",
@@ -297,6 +298,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
297298
"src/rabbit_access_control.erl",
298299
"src/rabbit_alarm.erl",
299300
"src/rabbit_amqqueue.erl",
301+
"src/rabbit_amqqueue_control.erl",
300302
"src/rabbit_amqqueue_process.erl",
301303
"src/rabbit_amqqueue_sup.erl",
302304
"src/rabbit_amqqueue_sup_sup.erl",
@@ -559,6 +561,7 @@ def all_srcs(name = "all_srcs"):
559561
"src/rabbit_access_control.erl",
560562
"src/rabbit_alarm.erl",
561563
"src/rabbit_amqqueue.erl",
564+
"src/rabbit_amqqueue_control.erl",
562565
"src/rabbit_amqqueue_process.erl",
563566
"src/rabbit_amqqueue_sup.erl",
564567
"src/rabbit_amqqueue_sup_sup.erl",

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353
-export([delete_crashed/1,
5454
delete_crashed/2,
5555
delete_crashed_internal/2]).
56-
56+
-export([delete_with/4, delete_with/6]).
5757
-export([pid_of/1, pid_of/2]).
58+
-export([pid_or_crashed/2]).
5859
-export([mark_local_durable_queues_stopped/1]).
5960

6061
-export([rebalance/3]).
@@ -71,6 +72,8 @@
7172
-export([prepend_extra_bcc/1]).
7273
-export([queue/1, queue_names/1]).
7374

75+
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
76+
7477
%% internal
7578
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
7679
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -116,6 +119,7 @@
116119
-define(CONSUMER_INFO_KEYS,
117120
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
118121
active, activity_status, arguments]).
122+
-define(KILL_QUEUE_DELAY_INTERVAL, 100).
119123

120124
warn_file_limit() ->
121125
DurableQueues = find_recoverable_queues(),
@@ -1601,6 +1605,51 @@ delete_immediately_by_resource(Resources) ->
16011605
delete(Q, IfUnused, IfEmpty, ActingUser) ->
16021606
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).
16031607

1608+
-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) ->
1609+
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
1610+
delete_with(QueueName, IfUnused, IfEmpty, ActingUser) ->
1611+
delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false).
1612+
1613+
-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) ->
1614+
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
1615+
delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) ->
1616+
QueueName = amqqueue:get_name(AMQQueue),
1617+
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive);
1618+
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) ->
1619+
case with(
1620+
QueueName,
1621+
fun (Q) ->
1622+
if CheckExclusive ->
1623+
check_exclusive_access(Q, ConnPid);
1624+
true ->
1625+
ok
1626+
end,
1627+
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
1628+
end,
1629+
fun (not_found) ->
1630+
{ok, 0};
1631+
({absent, Q, crashed}) ->
1632+
_ = delete_crashed(Q, Username),
1633+
{ok, 0};
1634+
({absent, Q, stopped}) ->
1635+
_ = delete_crashed(Q, Username),
1636+
{ok, 0};
1637+
({absent, Q, Reason}) ->
1638+
absent(Q, Reason)
1639+
end) of
1640+
{error, in_use} ->
1641+
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
1642+
{error, not_empty} ->
1643+
rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
1644+
{error, {exit, _, _}} ->
1645+
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
1646+
{ok, 0};
1647+
{ok, Count} ->
1648+
{ok, Count};
1649+
{protocol_error, Type, Reason, ReasonArgs} ->
1650+
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
1651+
end.
1652+
16041653
%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS
16051654
delete_crashed(Q) when ?amqqueue_is_classic(Q) ->
16061655
rabbit_classic_queue:delete_crashed(Q).
@@ -2061,3 +2110,61 @@ is_queue_args_combination_permitted(Durable, Exclusive) ->
20612110
true ->
20622111
rabbit_deprecated_features:is_permitted(transient_nonexcl_queues)
20632112
end.
2113+
2114+
-spec kill_queue_hard(node(), name()) -> ok.
2115+
kill_queue_hard(Node, QRes = #resource{kind = queue}) ->
2116+
kill_queue_hard(Node, QRes, boom).
2117+
2118+
-spec kill_queue_hard(node(), name(), atom()) -> ok.
2119+
kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) ->
2120+
case kill_queue(Node, QRes, Reason) of
2121+
crashed -> ok;
2122+
stopped -> ok;
2123+
NewPid when is_pid(NewPid) ->
2124+
timer:sleep(?KILL_QUEUE_DELAY_INTERVAL),
2125+
kill_queue_hard(Node, QRes, Reason);
2126+
Error -> Error
2127+
end.
2128+
2129+
-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()).
2130+
kill_queue(Node, QRes = #resource{kind = queue}) ->
2131+
kill_queue(Node, QRes, boom).
2132+
2133+
-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()).
2134+
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
2135+
Pid1 = pid_or_crashed(Node, QRes),
2136+
exit(Pid1, Reason),
2137+
rabbit_amqqueue_control:await_state(Node, QRes, stopped),
2138+
stopped;
2139+
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
2140+
case pid_or_crashed(Node, QRes) of
2141+
Pid1 when is_pid(Pid1) ->
2142+
exit(Pid1, Reason),
2143+
rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1);
2144+
crashed ->
2145+
crashed;
2146+
Error ->
2147+
Error
2148+
end.
2149+
2150+
-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()).
2151+
pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
2152+
case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of
2153+
{ok, Q} ->
2154+
QPid = amqqueue:get_pid(Q),
2155+
State = amqqueue:get_state(Q),
2156+
case State of
2157+
crashed ->
2158+
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
2159+
{error, {queue_supervisor_not_found, _}} -> {error, no_sup};
2160+
{ok, SPid} ->
2161+
case rabbit_misc:remote_sup_child(Node, SPid) of
2162+
{ok, _} -> QPid; %% restarting
2163+
{error, no_child} -> crashed %% given up
2164+
end
2165+
end;
2166+
_ -> QPid
2167+
end;
2168+
Error = {error, _} -> Error;
2169+
Reason -> {error, Reason}
2170+
end.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_amqqueue_control).
9+
10+
-export([await_new_pid/3, await_state/3, await_state/4]).
11+
12+
-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
13+
-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
14+
-define(AWAIT_STATE_DELAY_INTERVAL, 100).
15+
-define(AWAIT_STATE_DELAY_TIME_DELTA, 100).
16+
17+
-include_lib("rabbit_common/include/resource.hrl").
18+
19+
-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
20+
await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
21+
case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
22+
OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
23+
await_new_pid(Node, QRes, OldPid);
24+
New -> New
25+
end.
26+
27+
-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
28+
await_state(Node, QName, State) when is_binary(QName) ->
29+
QRes = rabbit_misc:r(<<"/">>, queue, QName),
30+
await_state(Node, QRes, State);
31+
await_state(Node, QRes = #resource{kind = queue}, State) ->
32+
await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).
33+
34+
-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
35+
await_state(Node, QName, State, Time) when is_binary(QName) ->
36+
QRes = rabbit_misc:r(<<"/">>, queue, QName),
37+
await_state(Node, QRes, State, Time);
38+
await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
39+
case state(Node, QRes) of
40+
State ->
41+
ok;
42+
Other ->
43+
case Time of
44+
0 -> exit({timeout_awaiting_state, State, Other});
45+
_ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
46+
await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
47+
end
48+
end.
49+
50+
state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
51+
Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
52+
fetch_state(QRes, Infos).
53+
54+
fetch_state(_QRes, []) -> undefined;
55+
fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State;
56+
fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) ->
57+
fetch_state(QRes, Rem).

deps/rabbit/src/rabbit_channel.erl

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -930,15 +930,6 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
930930
{stop, normal, State1}
931931
end.
932932

933-
-spec precondition_failed(string()) -> no_return().
934-
935-
precondition_failed(Format) -> precondition_failed(Format, []).
936-
937-
-spec precondition_failed(string(), [any()]) -> no_return().
938-
939-
precondition_failed(Format, Params) ->
940-
rabbit_misc:protocol_error(precondition_failed, Format, Params).
941-
942933
return_queue_declare_ok(#resource{name = ActualName},
943934
NoWait, MessageCount, ConsumerCount,
944935
#ch{cfg = Cfg} = State) ->
@@ -995,15 +986,15 @@ check_user_id_header(#'P_basic'{user_id = Claimed},
995986
tags = Tags}}}) ->
996987
case lists:member(impersonator, Tags) of
997988
true -> ok;
998-
false -> precondition_failed(
989+
false -> rabbit_misc:precondition_failed(
999990
"user_id property set to '~ts' but authenticated user was "
1000991
"'~ts'", [Claimed, Actual])
1001992
end.
1002993

1003994
check_expiration_header(Props) ->
1004995
case rabbit_basic:parse_expiration(Props) of
1005996
{ok, _} -> ok;
1006-
{error, E} -> precondition_failed("invalid expiration '~ts': ~tp",
997+
{error, E} -> rabbit_misc:precondition_failed("invalid expiration '~ts': ~tp",
1007998
[Props#'P_basic'.expiration, E])
1008999
end.
10091000

@@ -1074,15 +1065,15 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) ->
10741065
_ ->
10751066
"message size ~B is larger than configured max size ~B"
10761067
end,
1077-
precondition_failed(ErrorMessage,
1068+
rabbit_misc:precondition_failed(ErrorMessage,
10781069
[Size, MaxMessageSize]);
10791070
_ -> ok
10801071
end.
10811072

10821073
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
10831074
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
10841075
false -> ok;
1085-
{true, Limit} -> precondition_failed("cannot declare queue '~ts': "
1076+
{true, Limit} -> rabbit_misc:precondition_failed("cannot declare queue '~ts': "
10861077
"queue limit in vhost '~ts' (~tp) is reached",
10871078
[QueueName, VHost, Limit])
10881079

@@ -1704,7 +1695,7 @@ handle_method(#'queue.purge'{nowait = NoWait} = Method,
17041695
end;
17051696

17061697
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
1707-
precondition_failed("cannot switch from confirm to tx mode");
1698+
rabbit_misc:precondition_failed("cannot switch from confirm to tx mode");
17081699

17091700
handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
17101701
{reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
@@ -1713,7 +1704,7 @@ handle_method(#'tx.select'{}, _, State) ->
17131704
{reply, #'tx.select_ok'{}, State};
17141705

17151706
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
1716-
precondition_failed("channel is not transactional");
1707+
rabbit_misc:precondition_failed("channel is not transactional");
17171708

17181709
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
17191710
limiter = Limiter}) ->
@@ -1731,7 +1722,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
17311722
{noreply, maybe_complete_tx(State3#ch{tx = committing})};
17321723

17331724
handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
1734-
precondition_failed("channel is not transactional");
1725+
rabbit_misc:precondition_failed("channel is not transactional");
17351726

17361727
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
17371728
tx = {_Msgs, Acks}}) ->
@@ -1741,7 +1732,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
17411732
tx = new_tx()}};
17421733

17431734
handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
1744-
precondition_failed("cannot switch from tx to confirm mode");
1735+
rabbit_misc:precondition_failed("cannot switch from tx to confirm mode");
17451736

17461737
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
17471738
return_ok(State#ch{confirm_enabled = true},
@@ -1762,7 +1753,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
17621753
{ok, {Q, _CParams}} ->
17631754
{ok, QStates, Actions} = rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates0),
17641755
{noreply, handle_queue_actions(Actions, State#ch{queue_states = QStates})};
1765-
error -> precondition_failed(
1756+
error -> rabbit_misc:precondition_failed(
17661757
"unknown consumer tag '~ts'", [CTag])
17671758
end;
17681759

@@ -2050,7 +2041,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
20502041
UAMQTail, DeliveryTag, Multiple)
20512042
end;
20522043
{empty, _} ->
2053-
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
2044+
rabbit_misc:precondition_failed("unknown delivery tag ~w", [DeliveryTag])
20542045
end.
20552046

20562047
%% Settles (acknowledges) messages at the queue replica process level.
@@ -2540,7 +2531,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
25402531
undefined ->
25412532
ok;
25422533
{error, {invalid_type, Type}} ->
2543-
precondition_failed(
2534+
rabbit_misc:precondition_failed(
25442535
"invalid type '~ts' for arg '~ts' in ~ts",
25452536
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
25462537
DLX ->
@@ -2605,35 +2596,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
26052596
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
26062597

26072598
check_configure_permitted(QueueName, User, AuthzContext),
2608-
case rabbit_amqqueue:with(
2609-
QueueName,
2610-
fun (Q) ->
2611-
rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
2612-
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
2613-
end,
2614-
fun (not_found) ->
2615-
{ok, 0};
2616-
({absent, Q, crashed}) ->
2617-
_ = rabbit_classic_queue:delete_crashed(Q, Username),
2618-
{ok, 0};
2619-
({absent, Q, stopped}) ->
2620-
_ = rabbit_classic_queue:delete_crashed(Q, Username),
2621-
{ok, 0};
2622-
({absent, Q, Reason}) ->
2623-
rabbit_amqqueue:absent(Q, Reason)
2624-
end) of
2625-
{error, in_use} ->
2626-
precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
2627-
{error, not_empty} ->
2628-
precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
2629-
{error, {exit, _, _}} ->
2630-
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
2631-
{ok, 0};
2632-
{ok, Count} ->
2633-
{ok, Count};
2634-
{protocol_error, Type, Reason, ReasonArgs} ->
2635-
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
2636-
end;
2599+
rabbit_amqqueue:delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, true);
26372600
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
26382601
if_unused = IfUnused},
26392602
_ConnPid, AuthzContext, _CollectorPid, VHostPath,
@@ -2647,7 +2610,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
26472610
{error, not_found} ->
26482611
ok;
26492612
{error, in_use} ->
2650-
precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
2613+
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
26512614
ok ->
26522615
ok
26532616
end;
@@ -2689,7 +2652,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
26892652
case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
26902653
undefined -> ok;
26912654
{error, {invalid_type, Type}} ->
2692-
precondition_failed(
2655+
rabbit_misc:precondition_failed(
26932656
"invalid type '~ts' for arg '~ts' in ~ts",
26942657
[Type, AeKey, rabbit_misc:rs(ExchangeName)]);
26952658
AName -> check_read_permitted(ExchangeName, User, AuthzContext),

0 commit comments

Comments
 (0)