Skip to content

Commit d26d763

Browse files
Merge pull request #9365 from rabbitmq/mergify/bp/v3.12.x/pr-9324
Make rabbitmqctl delete_queue more robust (backport #9324)
2 parents ce08775 + 0178380 commit d26d763

File tree

12 files changed

+291
-141
lines changed

12 files changed

+291
-141
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def all_beam_files(name = "all_beam_files"):
4242
"src/rabbit_access_control.erl",
4343
"src/rabbit_alarm.erl",
4444
"src/rabbit_amqqueue.erl",
45+
"src/rabbit_amqqueue_control.erl",
4546
"src/rabbit_amqqueue_process.erl",
4647
"src/rabbit_amqqueue_sup.erl",
4748
"src/rabbit_amqqueue_sup_sup.erl",
@@ -284,6 +285,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
284285
"src/rabbit_access_control.erl",
285286
"src/rabbit_alarm.erl",
286287
"src/rabbit_amqqueue.erl",
288+
"src/rabbit_amqqueue_control.erl",
287289
"src/rabbit_amqqueue_process.erl",
288290
"src/rabbit_amqqueue_sup.erl",
289291
"src/rabbit_amqqueue_sup_sup.erl",
@@ -536,6 +538,7 @@ def all_srcs(name = "all_srcs"):
536538
"src/rabbit_access_control.erl",
537539
"src/rabbit_alarm.erl",
538540
"src/rabbit_amqqueue.erl",
541+
"src/rabbit_amqqueue_control.erl",
539542
"src/rabbit_amqqueue_process.erl",
540543
"src/rabbit_amqqueue_sup.erl",
541544
"src/rabbit_amqqueue_sup_sup.erl",

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353
-export([delete_crashed/1,
5454
delete_crashed/2,
5555
delete_crashed_internal/2]).
56-
56+
-export([delete_with/4, delete_with/6]).
5757
-export([pid_of/1, pid_of/2]).
58+
-export([pid_or_crashed/2]).
5859
-export([mark_local_durable_queues_stopped/1]).
5960

6061
-export([rebalance/3]).
@@ -70,6 +71,8 @@
7071

7172
-export([prepend_extra_bcc/1]).
7273

74+
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
75+
7376
%% internal
7477
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
7578
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -108,6 +111,7 @@
108111
-define(CONSUMER_INFO_KEYS,
109112
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
110113
active, activity_status, arguments]).
114+
-define(KILL_QUEUE_DELAY_INTERVAL, 100).
111115

112116
warn_file_limit() ->
113117
DurableQueues = find_recoverable_queues(),
@@ -1581,6 +1585,51 @@ delete_immediately_by_resource(Resources) ->
15811585
delete(Q, IfUnused, IfEmpty, ActingUser) ->
15821586
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).
15831587

1588+
-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) ->
1589+
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
1590+
delete_with(QueueName, IfUnused, IfEmpty, ActingUser) ->
1591+
delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false).
1592+
1593+
-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) ->
1594+
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
1595+
delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) ->
1596+
QueueName = amqqueue:get_name(AMQQueue),
1597+
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive);
1598+
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) ->
1599+
case with(
1600+
QueueName,
1601+
fun (Q) ->
1602+
if CheckExclusive ->
1603+
check_exclusive_access(Q, ConnPid);
1604+
true ->
1605+
ok
1606+
end,
1607+
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
1608+
end,
1609+
fun (not_found) ->
1610+
{ok, 0};
1611+
({absent, Q, crashed}) ->
1612+
_ = delete_crashed(Q, Username),
1613+
{ok, 0};
1614+
({absent, Q, stopped}) ->
1615+
_ = delete_crashed(Q, Username),
1616+
{ok, 0};
1617+
({absent, Q, Reason}) ->
1618+
absent(Q, Reason)
1619+
end) of
1620+
{error, in_use} ->
1621+
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
1622+
{error, not_empty} ->
1623+
rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
1624+
{error, {exit, _, _}} ->
1625+
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
1626+
{ok, 0};
1627+
{ok, Count} ->
1628+
{ok, Count};
1629+
{protocol_error, Type, Reason, ReasonArgs} ->
1630+
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
1631+
end.
1632+
15841633
%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS
15851634
delete_crashed(Q) when ?amqqueue_is_classic(Q) ->
15861635
rabbit_classic_queue:delete_crashed(Q).
@@ -2039,3 +2088,61 @@ get_bcc_queue(Q, BCCName) ->
20392088
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
20402089
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
20412090
rabbit_amqqueue:lookup(BCCQueueName).
2091+
2092+
-spec kill_queue_hard(node(), name()) -> ok.
2093+
kill_queue_hard(Node, QRes = #resource{kind = queue}) ->
2094+
kill_queue_hard(Node, QRes, boom).
2095+
2096+
-spec kill_queue_hard(node(), name(), atom()) -> ok.
2097+
kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) ->
2098+
case kill_queue(Node, QRes, Reason) of
2099+
crashed -> ok;
2100+
stopped -> ok;
2101+
NewPid when is_pid(NewPid) ->
2102+
timer:sleep(?KILL_QUEUE_DELAY_INTERVAL),
2103+
kill_queue_hard(Node, QRes, Reason);
2104+
Error -> Error
2105+
end.
2106+
2107+
-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()).
2108+
kill_queue(Node, QRes = #resource{kind = queue}) ->
2109+
kill_queue(Node, QRes, boom).
2110+
2111+
-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()).
2112+
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
2113+
Pid1 = pid_or_crashed(Node, QRes),
2114+
exit(Pid1, Reason),
2115+
rabbit_amqqueue_control:await_state(Node, QRes, stopped),
2116+
stopped;
2117+
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
2118+
case pid_or_crashed(Node, QRes) of
2119+
Pid1 when is_pid(Pid1) ->
2120+
exit(Pid1, Reason),
2121+
rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1);
2122+
crashed ->
2123+
crashed;
2124+
Error ->
2125+
Error
2126+
end.
2127+
2128+
-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()).
2129+
pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
2130+
case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of
2131+
{ok, Q} ->
2132+
QPid = amqqueue:get_pid(Q),
2133+
State = amqqueue:get_state(Q),
2134+
case State of
2135+
crashed ->
2136+
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
2137+
{error, {queue_supervisor_not_found, _}} -> {error, no_sup};
2138+
{ok, SPid} ->
2139+
case rabbit_misc:remote_sup_child(Node, SPid) of
2140+
{ok, _} -> QPid; %% restarting
2141+
{error, no_child} -> crashed %% given up
2142+
end
2143+
end;
2144+
_ -> QPid
2145+
end;
2146+
Error = {error, _} -> Error;
2147+
Reason -> {error, Reason}
2148+
end.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_amqqueue_control).
9+
10+
-export([await_new_pid/3, await_state/3, await_state/4]).
11+
12+
-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
13+
-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
14+
-define(AWAIT_STATE_DELAY_INTERVAL, 100).
15+
-define(AWAIT_STATE_DELAY_TIME_DELTA, 100).
16+
17+
-include_lib("rabbit_common/include/resource.hrl").
18+
19+
-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
20+
await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
21+
case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
22+
OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
23+
await_new_pid(Node, QRes, OldPid);
24+
New -> New
25+
end.
26+
27+
-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
28+
await_state(Node, QName, State) when is_binary(QName) ->
29+
QRes = rabbit_misc:r(<<"/">>, queue, QName),
30+
await_state(Node, QRes, State);
31+
await_state(Node, QRes = #resource{kind = queue}, State) ->
32+
await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).
33+
34+
-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
35+
await_state(Node, QName, State, Time) when is_binary(QName) ->
36+
QRes = rabbit_misc:r(<<"/">>, queue, QName),
37+
await_state(Node, QRes, State, Time);
38+
await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
39+
case state(Node, QRes) of
40+
State ->
41+
ok;
42+
Other ->
43+
case Time of
44+
0 -> exit({timeout_awaiting_state, State, Other});
45+
_ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
46+
await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
47+
end
48+
end.
49+
50+
state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
51+
Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
52+
fetch_state(QRes, Infos).
53+
54+
fetch_state(_QRes, []) -> undefined;
55+
fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State;
56+
fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) ->
57+
fetch_state(QRes, Rem).

0 commit comments

Comments
 (0)