Skip to content

Commit 7c1b567

Browse files
Merge pull request #1761 from rabbitmq/quorum-data-cleanup
Data cleanup for quorum queues
2 parents a91a4fa + 4faedab commit 7c1b567

File tree

3 files changed

+68
-4
lines changed

3 files changed

+68
-4
lines changed

src/rabbit_amqqueue.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
emit_info_all/5, list_local/1, info_local/1,
3131
emit_info_local/4, emit_info_down/4]).
3232
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
33+
-export([list_by_type/1]).
3334
-export([notify_policy_changed/1]).
3435
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
3536
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -124,6 +125,7 @@
124125
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
125126
-spec list_names() -> [rabbit_amqqueue:name()].
126127
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
128+
-spec list_by_type(atom()) -> [rabbit_types:amqqueue()].
127129
-spec info_keys() -> rabbit_types:info_keys().
128130
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
129131
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
@@ -741,6 +743,15 @@ list_local_names() ->
741743
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
742744
State =/= crashed, is_local_to_node(QPid, node())].
743745

746+
list_by_type(Type) ->
747+
{atomic, Qs} =
748+
mnesia:sync_transaction(
749+
fun () ->
750+
mnesia:match_object(rabbit_durable_queue,
751+
#amqqueue{_ = '_', type = Type}, read)
752+
end),
753+
Qs.
754+
744755
list_local_followers() ->
745756
[ Q#amqqueue.name
746757
|| #amqqueue{state = State, type = quorum, pid = {_, Leader},

src/rabbit_quorum_queue.erl

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
-export([add_member/3]).
3535
-export([delete_member/3]).
3636
-export([requeue/3]).
37+
-export([cleanup_data_dir/0]).
3738

3839
-include_lib("rabbit_common/include/rabbit.hrl").
3940
-include_lib("stdlib/include/qlc.hrl").
@@ -240,7 +241,7 @@ recover(Queues) ->
240241
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
241242
ok -> ok;
242243
Err ->
243-
rabbit_log:warning("recover: Quorum queue ~w could not"
244+
rabbit_log:warning("recover: quorum queue ~w could not"
244245
" be started ~w", [Name, Err]),
245246
ok
246247
end;
@@ -251,7 +252,7 @@ recover(Queues) ->
251252
ok;
252253
Err ->
253254
%% catch all clause to avoid causing the vhost not to start
254-
rabbit_log:warning("recover: Quorum queue ~w could not be "
255+
rabbit_log:warning("recover: quorum queue ~w could not be "
255256
"restarted ~w", [Name, Err]),
256257
ok
257258
end,
@@ -281,7 +282,7 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
281282
end,
282283
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
283284
{ok, Msgs};
284-
{error, {no_more_nodes_to_try, Errs}} = Err ->
285+
{error, {no_more_servers_to_try, Errs}} ->
285286
case lists:all(fun({{error, noproc}, _}) -> true;
286287
(_) -> false
287288
end, Errs) of
@@ -291,7 +292,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
291292
rabbit_core_metrics:queue_deleted(QName),
292293
{ok, Msgs};
293294
false ->
294-
Err
295+
rabbit_misc:protocol_error(
296+
internal_error,
297+
"Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
298+
[rabbit_misc:rs(QName), Errs])
295299
end
296300
end.
297301

@@ -386,6 +390,26 @@ purge(Node) ->
386390
requeue(ConsumerTag, MsgIds, FState) ->
387391
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
388392

393+
cleanup_data_dir() ->
394+
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
395+
<- rabbit_amqqueue:list_by_type(quorum),
396+
lists:member(node(), Nodes)],
397+
Registered = ra_directory:list_registered(),
398+
[maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
399+
not lists:member(Name, Names)],
400+
ok.
401+
402+
maybe_delete_data_dir(UId) ->
403+
Dir = ra_env:server_data_dir(UId),
404+
{ok, Config} = ra_log:read_config(Dir),
405+
case maps:get(machine, Config) of
406+
{module, rabbit_fifo, _} ->
407+
ra_lib:recursive_delete(Dir),
408+
ra_directory:unregister_name(UId);
409+
_ ->
410+
ok
411+
end.
412+
389413
cluster_state(Name) ->
390414
case whereis(Name) of
391415
undefined -> down;

test/quorum_queue_SUITE.erl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ groups() ->
3636
{cluster_size_2, [], [add_member]}
3737
]},
3838
{clustered, [], [
39+
{cluster_size_2, [], [cleanup_data_dir]},
3940
{cluster_size_2, [], [add_member_not_running,
4041
add_member_classic,
4142
add_member_already_a_member,
@@ -1586,6 +1587,34 @@ delete_member(Config) ->
15861587
rpc:call(Server, rabbit_quorum_queue, delete_member,
15871588
[<<"/">>, QQ, Server])).
15881589

1590+
cleanup_data_dir(Config) ->
1591+
%% This test is slow, but also checks that we handle properly errors when
1592+
%% trying to delete a queue in minority. A case clause there had gone
1593+
%% previously unnoticed.
1594+
1595+
[Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1596+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
1597+
QQ = ?config(queue_name, Config),
1598+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1599+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1600+
timer:sleep(100),
1601+
1602+
[{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []),
1603+
DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]),
1604+
?assert(filelib:is_dir(DataDir)),
1605+
1606+
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
1607+
1608+
?assertExit({{shutdown,
1609+
{connection_closing, {server_initiated_close, 541, _}}}, _},
1610+
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
1611+
?assert(filelib:is_dir(DataDir)),
1612+
1613+
?assertEqual(ok,
1614+
rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir,
1615+
[])),
1616+
?assert(not filelib:is_dir(DataDir)).
1617+
15891618
basic_recover(Config) ->
15901619
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
15911620

0 commit comments

Comments
 (0)