Skip to content

Commit 9ba3e3d

Browse files
Merge pull request #9442 from rabbitmq/handle-shrink-errors
forget_cluster_node: handle errors while shrinking streams and/or QQs
2 parents 66e806e + 7540ccc commit 9ba3e3d

File tree

5 files changed

+207
-6
lines changed

5 files changed

+207
-6
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,6 +1170,14 @@ rabbitmq_integration_suite(
11701170
size = "large",
11711171
)
11721172

1173+
rabbitmq_integration_suite(
1174+
name = "cli_forget_cluster_node_SUITE",
1175+
size = "medium",
1176+
additional_beam = [
1177+
":test_clustering_utils_beam",
1178+
],
1179+
)
1180+
11731181
assert_suites()
11741182

11751183
filegroup(

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2040,3 +2040,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
20402040
erlc_opts = "//:test_erlc_opts",
20412041
deps = ["//deps/amqp_client:erlang_app"],
20422042
)
2043+
erlang_bytecode(
2044+
name = "cli_forget_cluster_node_SUITE_beam_files",
2045+
testonly = True,
2046+
srcs = ["test/cli_forget_cluster_node_SUITE.erl"],
2047+
outs = ["test/cli_forget_cluster_node_SUITE.beam"],
2048+
app_name = "rabbit",
2049+
erlc_opts = "//:test_erlc_opts",
2050+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
2051+
)

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
-export([set_retention_policy/3]).
4343
-export([restart_stream/3,
4444
add_replica/3,
45-
delete_replica/3]).
45+
delete_replica/3,
46+
delete_all_replicas/1]).
4647
-export([format_osiris_event/2]).
4748
-export([update_stream_conf/2]).
4849
-export([readers/1]).
@@ -936,6 +937,32 @@ delete_replica(VHost, Name, Node) ->
936937
E
937938
end.
938939

940+
delete_all_replicas(Node) ->
941+
rabbit_log:info("Asked to remove all stream replicas from node ~ts", [Node]),
942+
Streams = rabbit_amqqueue:list_by_type(stream),
943+
Errors =
944+
lists:foldl(fun(Q, Acc) ->
945+
QName = amqqueue:get_name(Q),
946+
rabbit_log:info("~ts: removing replica on node ~w",
947+
[rabbit_misc:rs(QName), Node]),
948+
#{name := StreamId} = amqqueue:get_type_state(Q),
949+
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
950+
case Reply of
951+
ok ->
952+
Acc;
953+
Err ->
954+
rabbit_log:warning("~ts: failed to remove replica on node ~w, error: ~w",
955+
[rabbit_misc:rs(QName), Node, Err]),
956+
[{QName, Err} | Acc]
957+
end
958+
end, [], Streams),
959+
case Errors of
960+
[] ->
961+
ok;
962+
_ ->
963+
{error, Errors}
964+
end.
965+
939966
make_stream_conf(Q) ->
940967
QName = amqqueue:get_name(Q),
941968
Name = stream_name(QName),
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(cli_forget_cluster_node_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile(export_all).
16+
17+
-import(clustering_utils, [
18+
assert_cluster_status/2,
19+
assert_clustered/1,
20+
assert_not_clustered/1
21+
]).
22+
23+
all() ->
24+
[
25+
{group, cluster_size_3}
26+
].
27+
28+
groups() ->
29+
[
30+
{cluster_size_3, [], [
31+
forget_cluster_node_with_quorum_queues,
32+
forget_cluster_node_with_last_quorum_member
33+
]}
34+
].
35+
36+
suite() ->
37+
[
38+
%% If a testcase hangs, no need to wait for 30 minutes.
39+
{timetrap, {minutes, 5}}
40+
].
41+
42+
%% -------------------------------------------------------------------
43+
%% Testsuite setup/teardown.
44+
%% -------------------------------------------------------------------
45+
46+
init_per_suite(Config) ->
47+
rabbit_ct_helpers:log_environment(),
48+
Config1 = rabbit_ct_helpers:merge_app_env(
49+
Config, {rabbit, [
50+
{mnesia_table_loading_retry_limit, 2},
51+
{mnesia_table_loading_retry_timeout,1000}
52+
]}),
53+
rabbit_ct_helpers:run_setup_steps(Config1).
54+
55+
end_per_suite(Config) ->
56+
rabbit_ct_helpers:run_teardown_steps(Config).
57+
58+
init_per_group(cluster_size_3, Config) ->
59+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3},
60+
{rmq_nodes_clustered, true}]).
61+
62+
end_per_group(_, Config) ->
63+
Config.
64+
65+
init_per_testcase(Testcase, Config) ->
66+
rabbit_ct_helpers:testcase_started(Config, Testcase),
67+
ClusterSize = ?config(rmq_nodes_count, Config),
68+
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
69+
Config1 = rabbit_ct_helpers:set_config(Config, [
70+
{rmq_nodename_suffix, Testcase},
71+
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
72+
{keep_pid_file_on_exit, true}
73+
]),
74+
rabbit_ct_helpers:run_steps(Config1,
75+
rabbit_ct_broker_helpers:setup_steps() ++
76+
rabbit_ct_client_helpers:setup_steps()).
77+
78+
end_per_testcase(Testcase, Config) ->
79+
Config1 = rabbit_ct_helpers:run_steps(Config,
80+
rabbit_ct_client_helpers:teardown_steps() ++
81+
rabbit_ct_broker_helpers:teardown_steps()),
82+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
83+
84+
%% -------------------------------------------------------------------
85+
%% Test cases
86+
%% -------------------------------------------------------------------
87+
forget_cluster_node_with_quorum_queues(Config) ->
88+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
89+
90+
assert_clustered([Rabbit, Hare, Bunny]),
91+
92+
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
93+
QQ1 = <<"quorum-queue-1">>,
94+
QQ2 = <<"quorum-queue-2">>,
95+
declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
96+
declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
97+
98+
?awaitMatch(Members when length(Members) == 3, get_quorum_members(Rabbit, QQ1), 30000),
99+
?awaitMatch(Members when length(Members) == 3, get_quorum_members(Rabbit, QQ2), 30000),
100+
101+
?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
102+
?assertEqual(ok, forget_cluster_node(Rabbit, Bunny)),
103+
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
104+
[Rabbit, Hare]),
105+
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ1), 30000),
106+
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ2), 30000).
107+
108+
forget_cluster_node_with_last_quorum_member(Config) ->
109+
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
110+
111+
assert_clustered([Rabbit, Hare, Bunny]),
112+
113+
Ch = rabbit_ct_client_helpers:open_channel(Config, Bunny),
114+
QQ1 = <<"quorum-queue-1">>,
115+
QQ2 = <<"quorum-queue-2">>,
116+
declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>},
117+
{<<"x-quorum-initial-group-size">>, long, 1}]),
118+
declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>},
119+
{<<"x-quorum-initial-group-size">>, long, 1}]),
120+
121+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ1), 30000),
122+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ2), 30000),
123+
124+
?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
125+
?assertMatch({error, 69, _}, forget_cluster_node(Rabbit, Bunny)),
126+
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
127+
[Rabbit, Hare]),
128+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ1), 30000),
129+
?awaitMatch(Members when length(Members) == 1, get_quorum_members(Rabbit, QQ2), 30000).
130+
131+
forget_cluster_node(Node, Removee) ->
132+
rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)],
133+
[]).
134+
135+
get_quorum_members(Server, Q) ->
136+
Info = rpc:call(Server, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, Q)]),
137+
proplists:get_value(members, Info).
138+
139+
declare(Ch, Q, Args) ->
140+
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
141+
durable = true,
142+
auto_delete = false,
143+
arguments = Args}).

deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/forget_cluster_node_command.ex

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,27 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ForgetClusterNodeCommand do
8282
error
8383

8484
:ok ->
85-
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [atom_name]) do
86-
{:error, _} ->
87-
{:error,
88-
"RabbitMQ failed to shrink some of the quorum queues on node #{node_to_remove}"}
85+
qq_shrink_result =
86+
:rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [atom_name])
8987

90-
_ ->
88+
is_ok_fun = fn
89+
{_, {:ok, _}} -> true
90+
{_, {:error, _, _}} -> false
91+
end
92+
93+
case Enum.empty?(qq_shrink_result) do
94+
true ->
9195
:ok
96+
97+
false ->
98+
case Enum.any?(qq_shrink_result, is_ok_fun) do
99+
false ->
100+
{:error,
101+
"RabbitMQ failed to shrink some of the quorum queues on node #{node_to_remove}"}
102+
103+
true ->
104+
:ok
105+
end
92106
end
93107

94108
other ->

0 commit comments

Comments
 (0)