Skip to content

Commit f95b3b4

Browse files
dcorbachomergify[bot]
authored andcommitted
forget_cluster_node: handle errors while shrinking quorum queues
(cherry picked from commit 7540ccc) # Conflicts: # deps/rabbit/BUILD.bazel # deps/rabbit/app.bzl
1 parent 6468d78 commit f95b3b4

File tree

5 files changed

+264
-6
lines changed

5 files changed

+264
-6
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,22 @@ rabbitmq_integration_suite(
11201120
],
11211121
)
11221122

1123+
<<<<<<< HEAD
1124+
=======
1125+
rabbitmq_integration_suite(
1126+
name = "routing_SUITE",
1127+
size = "large",
1128+
)
1129+
1130+
rabbitmq_integration_suite(
1131+
name = "cli_forget_cluster_node_SUITE",
1132+
size = "medium",
1133+
additional_beam = [
1134+
":test_clustering_utils_beam",
1135+
],
1136+
)
1137+
1138+
>>>>>>> 7540ccc628 (forget_cluster_node: handle errors while shrinking quorum queues)
11231139
assert_suites()
11241140

11251141
filegroup(

deps/rabbit/app.bzl

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,3 +1951,61 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
19511951
erlc_opts = "//:test_erlc_opts",
19521952
deps = ["//deps/amqp_client:erlang_app"],
19531953
)
1954+
<<<<<<< HEAD
1955+
=======
1956+
erlang_bytecode(
1957+
name = "rabbitmq_4_0_deprecations_SUITE_beam_files",
1958+
testonly = True,
1959+
srcs = ["test/rabbitmq_4_0_deprecations_SUITE.erl"],
1960+
outs = ["test/rabbitmq_4_0_deprecations_SUITE.beam"],
1961+
app_name = "rabbit",
1962+
erlc_opts = "//:test_erlc_opts",
1963+
deps = ["//deps/amqp_client:erlang_app"],
1964+
)
1965+
erlang_bytecode(
1966+
name = "quorum_queue_member_reconciliation_SUITE_beam_files",
1967+
testonly = True,
1968+
srcs = ["test/quorum_queue_member_reconciliation_SUITE.erl"],
1969+
outs = ["test/quorum_queue_member_reconciliation_SUITE.beam"],
1970+
app_name = "rabbit",
1971+
erlc_opts = "//:test_erlc_opts",
1972+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
1973+
)
1974+
erlang_bytecode(
1975+
name = "message_containers_SUITE_beam_files",
1976+
testonly = True,
1977+
srcs = ["test/message_containers_SUITE.erl"],
1978+
outs = ["test/message_containers_SUITE.beam"],
1979+
app_name = "rabbit",
1980+
erlc_opts = "//:test_erlc_opts",
1981+
deps = ["//deps/amqp_client:erlang_app"],
1982+
)
1983+
erlang_bytecode(
1984+
name = "mc_SUITE_beam_files",
1985+
testonly = True,
1986+
srcs = ["test/mc_SUITE.erl"],
1987+
outs = ["test/mc_SUITE.beam"],
1988+
hdrs = ["include/mc.hrl"],
1989+
app_name = "rabbit",
1990+
erlc_opts = "//:test_erlc_opts",
1991+
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
1992+
)
1993+
erlang_bytecode(
1994+
name = "routing_SUITE_beam_files",
1995+
testonly = True,
1996+
srcs = ["test/routing_SUITE.erl"],
1997+
outs = ["test/routing_SUITE.beam"],
1998+
app_name = "rabbit",
1999+
erlc_opts = "//:test_erlc_opts",
2000+
deps = ["//deps/amqp_client:erlang_app"],
2001+
)
2002+
erlang_bytecode(
2003+
name = "cli_forget_cluster_node_SUITE_beam_files",
2004+
testonly = True,
2005+
srcs = ["test/cli_forget_cluster_node_SUITE.erl"],
2006+
outs = ["test/cli_forget_cluster_node_SUITE.beam"],
2007+
app_name = "rabbit",
2008+
erlc_opts = "//:test_erlc_opts",
2009+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
2010+
)
2011+
>>>>>>> 7540ccc628 (forget_cluster_node: handle errors while shrinking quorum queues)

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
-export([set_retention_policy/3]).
4343
-export([restart_stream/3,
4444
add_replica/3,
45-
delete_replica/3]).
45+
delete_replica/3,
46+
delete_all_replicas/1]).
4647
-export([format_osiris_event/2]).
4748
-export([update_stream_conf/2]).
4849
-export([readers/1]).
@@ -861,6 +862,32 @@ delete_replica(VHost, Name, Node) ->
861862
E
862863
end.
863864

865+
delete_all_replicas(Node) ->
866+
rabbit_log:info("Asked to remove all stream replicas from node ~ts", [Node]),
867+
Streams = rabbit_amqqueue:list_by_type(stream),
868+
Errors =
869+
lists:foldl(fun(Q, Acc) ->
870+
QName = amqqueue:get_name(Q),
871+
rabbit_log:info("~ts: removing replica on node ~w",
872+
[rabbit_misc:rs(QName), Node]),
873+
#{name := StreamId} = amqqueue:get_type_state(Q),
874+
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
875+
case Reply of
876+
ok ->
877+
Acc;
878+
Err ->
879+
rabbit_log:warning("~ts: failed to remove replica on node ~w, error: ~w",
880+
[rabbit_misc:rs(QName), Node, Err]),
881+
[{QName, Err} | Acc]
882+
end
883+
end, [], Streams),
884+
case Errors of
885+
[] ->
886+
ok;
887+
_ ->
888+
{error, Errors}
889+
end.
890+
864891
make_stream_conf(Q) ->
865892
QName = amqqueue:get_name(Q),
866893
Name = stream_name(QName),
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(cli_forget_cluster_node_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile(export_all).
16+
17+
-import(clustering_utils, [
18+
assert_cluster_status/2,
19+
assert_clustered/1,
20+
assert_not_clustered/1
21+
]).
22+
23+
all() ->
24+
[
25+
{group, cluster_size_3}
26+
].
27+
28+
groups() ->
29+
[
30+
{cluster_size_3, [], [
31+
forget_cluster_node_with_quorum_queues,
32+
forget_cluster_node_with_last_quorum_member
33+
]}
34+
].
35+
36+
suite() ->
37+
[
38+
%% If a testcase hangs, no need to wait for 30 minutes.
39+
{timetrap, {minutes, 5}}
40+
].
41+
42+
%% -------------------------------------------------------------------
43+
%% Testsuite setup/teardown.
44+
%% -------------------------------------------------------------------
45+
46+
init_per_suite(Config) ->
47+
rabbit_ct_helpers:log_environment(),
48+
Config1 = rabbit_ct_helpers:merge_app_env(
49+
Config, {rabbit, [
50+
{mnesia_table_loading_retry_limit, 2},
51+
{mnesia_table_loading_retry_timeout,1000}
52+
]}),
53+
rabbit_ct_helpers:run_setup_steps(Config1).
54+
55+
end_per_suite(Config) ->
56+
rabbit_ct_helpers:run_teardown_steps(Config).
57+
58+
init_per_group(cluster_size_3, Config) ->
59+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3},
60+
{rmq_nodes_clustered, true}]).
61+
62+
end_per_group(_, Config) ->
63+
Config.
64+
65+
init_per_testcase(Testcase, Config) ->
66+
rabbit_ct_helpers:testcase_started(Config, Testcase),
67+
ClusterSize = ?config(rmq_nodes_count, Config),
68+
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
69+
Config1 = rabbit_ct_helpers:set_config(Config, [
70+
{rmq_nodename_suffix, Testcase},
71+
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
72+
{keep_pid_file_on_exit, true}
73+
]),
74+
rabbit_ct_helpers:run_steps(Config1,
75+
rabbit_ct_broker_helpers:setup_steps() ++
76+
rabbit_ct_client_helpers:setup_steps()).
77+
78+
end_per_testcase(Testcase, Config) ->
79+
Config1 = rabbit_ct_helpers:run_steps(Config,
80+
rabbit_ct_client_helpers:teardown_steps() ++
81+
rabbit_ct_broker_helpers:teardown_steps()),
82+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
83+
84+
%% -------------------------------------------------------------------
85+
%% Test cases
86+
%% -------------------------------------------------------------------
87+
forget_cluster_node_with_quorum_queues(Config) ->
88+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
89+
90+
assert_clustered([Rabbit, Hare, Bunny]),
91+
92+
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
93+
QQ1 = <<"quorum-queue-1">>,
94+
QQ2 = <<"quorum-queue-2">>,
95+
declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
96+
declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
97+
98+
?awaitMatch(Members when length(Members) == 3, get_quorum_members(Rabbit, QQ1), 30000),
99+
?awaitMatch(Members when length(Members) == 3, get_quorum_members(Rabbit, QQ2), 30000),
100+
101+
?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
102+
?assertEqual(ok, forget_cluster_node(Rabbit, Bunny)),
103+
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
104+
[Rabbit, Hare]),
105+
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ1), 30000),
106+
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ2), 30000).
107+
108+
forget_cluster_node_with_last_quorum_member(Config) ->
109+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
110+
111+
assert_clustered([Rabbit, Hare, Bunny]),
112+
113+
Ch = rabbit_ct_client_helpers:open_channel(Config, Bunny),
114+
QQ1 = <<"quorum-queue-1">>,
115+
QQ2 = <<"quorum-queue-2">>,
116+
declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>},
117+
{<<"x-quorum-initial-group-size">>, long, 1}]),
118+
declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>},
119+
{<<"x-quorum-initial-group-size">>, long, 1}]),
120+
121+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ1), 30000),
122+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ2), 30000),
123+
124+
?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
125+
?assertMatch({error, 69, _}, forget_cluster_node(Rabbit, Bunny)),
126+
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
127+
[Rabbit, Hare]),
128+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ1), 30000),
129+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ2), 30000).
130+
131+
forget_cluster_node(Node, Removee) ->
132+
rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)],
133+
[]).
134+
135+
get_quorum_members(Server, Q) ->
136+
Info = rpc:call(Server, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, Q)]),
137+
proplists:get_value(members, Info).
138+
139+
declare(Ch, Q, Args) ->
140+
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
141+
durable = true,
142+
auto_delete = false,
143+
arguments = Args}).

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,27 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do
6060
error
6161

6262
:ok ->
63-
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [atom_name]) do
64-
{:error, _} ->
65-
{:error,
66-
"RabbitMQ failed to shrink some of the quorum queues on node #{node_to_remove}"}
63+
qq_shrink_result =
64+
:rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [atom_name])
6765

68-
_ ->
66+
is_ok_fun = fn
67+
{_, {:ok, _}} -> true
68+
{_, {:error, _, _}} -> false
69+
end
70+
71+
case Enum.empty?(qq_shrink_result) do
72+
true ->
6973
:ok
74+
75+
false ->
76+
case Enum.any?(qq_shrink_result, is_ok_fun) do
77+
false ->
78+
{:error,
79+
"RabbitMQ failed to shrink some of the quorum queues on node #{node_to_remove}"}
80+
81+
true ->
82+
:ok
83+
end
7084
end
7185

7286
other ->

0 commit comments

Comments
 (0)