Skip to content

Commit 94942f6

Browse files
Merge pull request #11835 from rabbitmq/mergify/bp/v4.0.x/pr-11664
rabbit_node_monitor: use a leader query for cluster members on node_added event (backport #11664)
2 parents 5708803 + 053b4a4 commit 94942f6

File tree

7 files changed

+171
-10
lines changed

7 files changed

+171
-10
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ rabbitmq_integration_suite(
328328
size = "medium",
329329
)
330330

331+
rabbitmq_integration_suite(
332+
name = "clustering_events_SUITE",
333+
additional_beam = [
334+
":test_event_recorder_beam",
335+
],
336+
size = "medium",
337+
)
338+
331339
rabbitmq_integration_suite(
332340
name = "quorum_queue_member_reconciliation_SUITE",
333341
size = "medium",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
831831
erlc_opts = "//:test_erlc_opts",
832832
deps = ["//deps/amqp_client:erlang_app"],
833833
)
834+
erlang_bytecode(
835+
name = "clustering_events_SUITE_beam_files",
836+
testonly = True,
837+
srcs = ["test/clustering_events_SUITE.erl"],
838+
outs = ["test/clustering_events_SUITE.beam"],
839+
app_name = "rabbit",
840+
erlc_opts = "//:test_erlc_opts",
841+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
842+
)
834843

835844
erlang_bytecode(
836845
name = "clustering_management_SUITE_beam_files",

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-export([change_node_type/1]).
1818
-export([is_clustered/0,
1919
members/0,
20+
consistent_members/0,
2021
disc_members/0,
2122
node_type/0,
2223
check_compatibility/1,
@@ -150,6 +151,8 @@ join(RemoteNode, NodeType)
150151
false -> ok = rabbit_mnesia:reset_gracefully()
151152
end,
152153

154+
ok = rabbit_node_monitor:notify_left_cluster(node()),
155+
153156
%% Now that the files are all gone after the reset above, restart
154157
%% the Ra systems. They will recreate their folder in the process.
155158
case RestartRabbit of
@@ -224,6 +227,14 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
224227
%% @doc Removes `Node' from the cluster.
225228

226229
forget_member(Node, RemoveWhenOffline) ->
230+
case forget_member0(Node, RemoveWhenOffline) of
231+
ok ->
232+
rabbit_node_monitor:notify_left_cluster(Node);
233+
Error ->
234+
Error
235+
end.
236+
237+
forget_member0(Node, RemoveWhenOffline) ->
227238
case rabbit:is_running(Node) of
228239
false ->
229240
?LOG_DEBUG(
@@ -306,6 +317,19 @@ members_using_khepri() ->
306317
%% so we need to allow callers to be more defensive in this case.
307318
rabbit_khepri:locally_known_nodes().
308319

320+
-spec consistent_members() -> Members when
321+
Members :: [node()].
322+
%% @doc Returns the list of cluster members.
323+
324+
consistent_members() ->
325+
case rabbit_khepri:get_feature_state() of
326+
enabled -> consistent_members_using_khepri();
327+
_ -> members_using_mnesia()
328+
end.
329+
330+
consistent_members_using_khepri() ->
331+
rabbit_khepri:nodes().
332+
309333
-spec disc_members() -> Members when
310334
Members :: [node()].
311335
%% @private

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,6 @@ change_cluster_node_type(Type) ->
293293
-spec forget_cluster_node(node(), boolean()) -> 'ok'.
294294

295295
forget_cluster_node(Node, RemoveWhenOffline) ->
296-
forget_cluster_node(Node, RemoveWhenOffline, true).
297-
298-
forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) ->
299296
case lists:member(Node, cluster_nodes(all)) of
300297
true -> ok;
301298
false -> e(not_a_cluster_node)
@@ -307,9 +304,6 @@ forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) ->
307304
{false, true} -> rabbit_log:info(
308305
"Removing node ~tp from cluster", [Node]),
309306
case remove_node_if_mnesia_running(Node) of
310-
ok when EmitNodeDeletedEvent ->
311-
rabbit_event:notify(node_deleted, [{node, Node}]),
312-
ok;
313307
ok -> ok;
314308
{error, _} = Err -> throw(Err)
315309
end
@@ -333,7 +327,7 @@ remove_node_offline_node(Node) ->
333327
%% We skip the 'node_deleted' event because the
334328
%% application is stopped and thus, rabbit_event is not
335329
%% enabled.
336-
forget_cluster_node(Node, false, false),
330+
forget_cluster_node(Node, false),
337331
force_load_next_boot()
338332
after
339333
stop_mnesia()
@@ -892,8 +886,8 @@ remove_node_if_mnesia_running(Node) ->
892886
%% change being propagated to all nodes
893887
case mnesia:del_table_copy(schema, Node) of
894888
{atomic, ok} ->
895-
rabbit_amqqueue:forget_all_durable(Node),
896889
rabbit_node_monitor:notify_left_cluster(Node),
890+
rabbit_amqqueue:forget_all_durable(Node),
897891
ok;
898892
{aborted, Reason} ->
899893
{error, {failed_to_remove_node, Node, Reason}}

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ notify_node_up() ->
172172

173173
notify_joined_cluster() ->
174174
NewMember = node(),
175-
Nodes = alive_rabbit_nodes() -- [NewMember],
175+
Nodes = alive_rabbit_nodes(rabbit_nodes:list_consistent_members()) -- [NewMember],
176176
gen_server:abcast(Nodes, ?SERVER,
177177
{joined_cluster, node(), rabbit_db_cluster:node_type()}),
178178

@@ -620,6 +620,7 @@ handle_cast({left_cluster, Node}, State) ->
620620
{del_node(Node, AllNodes), del_node(Node, DiscNodes),
621621
del_node(Node, RunningNodes)})
622622
end,
623+
rabbit_event:notify(node_deleted, [{node, Node}]),
623624
{noreply, State};
624625

625626
handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
is_running/2, is_process_running/2,
1616
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
1717
all_running/0,
18-
is_member/1, list_members/0,
18+
is_member/1, list_members/0, list_consistent_members/0,
1919
filter_members/1,
2020
is_reachable/1, list_reachable/0, list_unreachable/0,
2121
filter_reachable/1, filter_unreachable/1,
@@ -182,6 +182,14 @@ is_member(Node) when is_atom(Node) ->
182182
list_members() ->
183183
rabbit_db_cluster:members().
184184

185+
-spec list_consistent_members() -> Nodes when
186+
Nodes :: [node()].
187+
%% @doc Returns the list of nodes in the cluster as reported by the leader.
188+
%%
189+
190+
list_consistent_members() ->
191+
rabbit_db_cluster:consistent_members().
192+
185193
-spec filter_members(Nodes) -> Nodes when
186194
Nodes :: [node()].
187195
%% @doc Filters the given list of nodes to only select those belonging to the
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(clustering_events_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-import(rabbit_ct_helpers, [eventually/3]).
16+
-import(event_recorder,
17+
[assert_event_type/2,
18+
assert_event_prop/2]).
19+
20+
-compile(export_all).
21+
22+
all() ->
23+
[
24+
{group, tests}
25+
].
26+
27+
groups() ->
28+
[
29+
{tests, [], [
30+
node_added_event,
31+
node_deleted_event
32+
]}
33+
].
34+
35+
%% -------------------------------------------------------------------
36+
%% Per Suite
37+
%% -------------------------------------------------------------------
38+
39+
init_per_suite(Config) ->
40+
rabbit_ct_helpers:log_environment(),
41+
rabbit_ct_helpers:run_setup_steps(Config).
42+
43+
end_per_suite(Config) ->
44+
rabbit_ct_helpers:run_teardown_steps(Config).
45+
46+
%%
47+
%% Per Group
48+
%%
49+
50+
init_per_group(_, Config) ->
51+
Config.
52+
53+
end_per_group(_, Config) ->
54+
Config.
55+
56+
%%
57+
%% Per Test Case
58+
%%
59+
init_per_testcase(node_added_event = TestCase, Config) ->
60+
Config1 = configure_cluster_essentials(Config, TestCase, false),
61+
Config2 = rabbit_ct_helpers:run_steps(Config1,
62+
rabbit_ct_broker_helpers:setup_steps() ++
63+
rabbit_ct_client_helpers:setup_steps()),
64+
rabbit_ct_helpers:testcase_started(Config2, TestCase);
65+
init_per_testcase(node_deleted_event = TestCase, Config) ->
66+
Config1 = configure_cluster_essentials(Config, TestCase, true),
67+
Config2 = rabbit_ct_helpers:run_steps(Config1,
68+
rabbit_ct_broker_helpers:setup_steps() ++
69+
rabbit_ct_client_helpers:setup_steps()),
70+
rabbit_ct_helpers:testcase_started(Config2, TestCase).
71+
72+
end_per_testcase(TestCase, Config) ->
73+
Config1 = rabbit_ct_helpers:run_steps(Config,
74+
rabbit_ct_client_helpers:teardown_steps() ++
75+
rabbit_ct_broker_helpers:teardown_steps()),
76+
rabbit_ct_helpers:testcase_finished(Config1, TestCase).
77+
78+
%%
79+
%% Helpers
80+
%%
81+
configure_cluster_essentials(Config, Group, Clustered) ->
82+
rabbit_ct_helpers:set_config(Config, [
83+
{rmq_nodename_suffix, Group},
84+
{rmq_nodes_count, 3},
85+
{rmq_nodes_clustered, Clustered}
86+
]).
87+
88+
node_added_event(Config) ->
89+
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
90+
ok = event_recorder:start(Config),
91+
join_cluster(Server2, Server1),
92+
E = event_recorder:get_events(Config),
93+
ok = event_recorder:stop(Config),
94+
?assert(lists:any(fun(#event{type = node_added}) ->
95+
true;
96+
(_) ->
97+
false
98+
end, E)).
99+
100+
node_deleted_event(Config) ->
101+
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
102+
ok = event_recorder:start(Config),
103+
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
104+
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)],
105+
[]),
106+
E = event_recorder:get_events(Config),
107+
ok = event_recorder:stop(Config),
108+
?assert(lists:any(fun(#event{type = node_deleted}) ->
109+
true;
110+
(_) ->
111+
false
112+
end, E)).
113+
114+
join_cluster(Node, Cluster) ->
115+
ok = rabbit_control_helper:command(stop_app, Node),
116+
ok = rabbit_control_helper:command(join_cluster, Node, [atom_to_list(Cluster)], []),
117+
rabbit_control_helper:command(start_app, Node).

0 commit comments

Comments
 (0)