Skip to content

Commit d16c618

Browse files
dcorbachomergify[bot]
authored andcommitted
rabbit_node_monitor: use a leader query for cluster members on node_added event
If the membership hasn't been updated locally yet, the event is never generated (cherry picked from commit 19a71d8)
1 parent be406d8 commit d16c618

File tree

7 files changed

+165
-3
lines changed

7 files changed

+165
-3
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ rabbitmq_integration_suite(
328328
size = "medium",
329329
)
330330

331+
rabbitmq_integration_suite(
332+
name = "clustering_events_SUITE",
333+
additional_beam = [
334+
":test_event_recorder_beam",
335+
],
336+
size = "medium",
337+
)
338+
331339
rabbitmq_integration_suite(
332340
name = "quorum_queue_member_reconciliation_SUITE",
333341
size = "medium",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
831831
erlc_opts = "//:test_erlc_opts",
832832
deps = ["//deps/amqp_client:erlang_app"],
833833
)
834+
erlang_bytecode(
835+
name = "clustering_events_SUITE_beam_files",
836+
testonly = True,
837+
srcs = ["test/clustering_events_SUITE.erl"],
838+
outs = ["test/clustering_events_SUITE.beam"],
839+
app_name = "rabbit",
840+
erlc_opts = "//:test_erlc_opts",
841+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
842+
)
834843

835844
erlang_bytecode(
836845
name = "clustering_management_SUITE_beam_files",

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-export([change_node_type/1]).
1818
-export([is_clustered/0,
1919
members/0,
20+
consistent_members/0,
2021
disc_members/0,
2122
node_type/0,
2223
check_compatibility/1,
@@ -306,6 +307,19 @@ members_using_khepri() ->
306307
%% so we need to allow callers to be more defensive in this case.
307308
rabbit_khepri:locally_known_nodes().
308309

310+
-spec consistent_members() -> Members when
311+
Members :: [node()].
312+
%% @doc Returns the list of cluster members.
313+
314+
consistent_members() ->
315+
case rabbit_khepri:get_feature_state() of
316+
enabled -> consistent_members_using_khepri();
317+
_ -> members_using_mnesia()
318+
end.
319+
320+
consistent_members_using_khepri() ->
321+
rabbit_khepri:nodes().
322+
309323
-spec disc_members() -> Members when
310324
Members :: [node()].
311325
%% @private

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,13 @@ post_add_member(JoiningNode, JoinedNode, Error) ->
425425
%% @private
426426

427427
leave_cluster(Node) ->
428-
retry_khepri_op(fun() -> remove_member(Node) end, 60).
428+
case retry_khepri_op(fun() -> remove_member(Node) end, 60) of
429+
ok ->
430+
rabbit_event:notify(node_deleted, [{node, Node}]),
431+
ok;
432+
Any ->
433+
Any
434+
end.
429435

430436
%% @private
431437

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ notify_node_up() ->
172172

173173
notify_joined_cluster() ->
174174
NewMember = node(),
175-
Nodes = alive_rabbit_nodes() -- [NewMember],
175+
Nodes = alive_rabbit_nodes(rabbit_nodes:list_consistent_members()) -- [NewMember],
176176
gen_server:abcast(Nodes, ?SERVER,
177177
{joined_cluster, node(), rabbit_db_cluster:node_type()}),
178178

deps/rabbit/src/rabbit_nodes.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
is_running/2, is_process_running/2,
1616
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
1717
all_running/0,
18-
is_member/1, list_members/0,
18+
is_member/1, list_members/0, list_consistent_members/0,
1919
filter_members/1,
2020
is_reachable/1, list_reachable/0, list_unreachable/0,
2121
filter_reachable/1, filter_unreachable/1,
@@ -182,6 +182,14 @@ is_member(Node) when is_atom(Node) ->
182182
list_members() ->
183183
rabbit_db_cluster:members().
184184

185+
-spec list_consistent_members() -> Nodes when
186+
Nodes :: [node()].
187+
%% @doc Returns the list of nodes in the cluster as reported by the leader.
188+
%%
189+
190+
list_consistent_members() ->
191+
rabbit_db_cluster:consistent_members().
192+
185193
-spec filter_members(Nodes) -> Nodes when
186194
Nodes :: [node()].
187195
%% @doc Filters the given list of nodes to only select those belonging to the
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(clustering_events_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-import(rabbit_ct_helpers, [eventually/3]).
16+
-import(event_recorder,
17+
[assert_event_type/2,
18+
assert_event_prop/2]).
19+
20+
-compile(export_all).
21+
22+
all() ->
23+
[
24+
{group, tests}
25+
].
26+
27+
groups() ->
28+
[
29+
{tests, [], [
30+
node_added_event,
31+
node_deleted_event
32+
]}
33+
].
34+
35+
%% -------------------------------------------------------------------
36+
%% Per Suite
37+
%% -------------------------------------------------------------------
38+
39+
init_per_suite(Config) ->
40+
rabbit_ct_helpers:log_environment(),
41+
rabbit_ct_helpers:run_setup_steps(Config).
42+
43+
end_per_suite(Config) ->
44+
rabbit_ct_helpers:run_teardown_steps(Config).
45+
46+
%%
47+
%% Per Group
48+
%%
49+
50+
init_per_group(_, Config) ->
51+
Config.
52+
53+
end_per_group(_, Config) ->
54+
Config.
55+
56+
%%
57+
%% Per Test Case
58+
%%
59+
init_per_testcase(node_added_event = TestCase, Config) ->
60+
Config1 = configure_cluster_essentials(Config, TestCase, false),
61+
Config2 = rabbit_ct_helpers:run_steps(Config1,
62+
rabbit_ct_broker_helpers:setup_steps() ++
63+
rabbit_ct_client_helpers:setup_steps()),
64+
rabbit_ct_helpers:testcase_started(Config2, TestCase);
65+
init_per_testcase(node_deleted_event = TestCase, Config) ->
66+
Config1 = configure_cluster_essentials(Config, TestCase, true),
67+
Config2 = rabbit_ct_helpers:run_steps(Config1,
68+
rabbit_ct_broker_helpers:setup_steps() ++
69+
rabbit_ct_client_helpers:setup_steps()),
70+
rabbit_ct_helpers:testcase_started(Config2, TestCase).
71+
72+
end_per_testcase(TestCase, Config) ->
73+
Config1 = rabbit_ct_helpers:run_steps(Config,
74+
rabbit_ct_client_helpers:teardown_steps() ++
75+
rabbit_ct_broker_helpers:teardown_steps()),
76+
rabbit_ct_helpers:testcase_finished(Config1, TestCase).
77+
78+
%%
79+
%% Helpers
80+
%%
81+
configure_cluster_essentials(Config, Group, Clustered) ->
82+
rabbit_ct_helpers:set_config(Config, [
83+
{rmq_nodename_suffix, Group},
84+
{rmq_nodes_count, 3},
85+
{rmq_nodes_clustered, Clustered}
86+
]).
87+
88+
node_added_event(Config) ->
89+
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
90+
ok = event_recorder:start(Config),
91+
join_cluster(Server2, Server1),
92+
E = event_recorder:get_events(Config),
93+
ok = event_recorder:stop(Config),
94+
?assert(lists:any(fun(#event{type = node_added}) ->
95+
true;
96+
(_) ->
97+
false
98+
end, E)).
99+
100+
node_deleted_event(Config) ->
101+
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
102+
ok = event_recorder:start(Config),
103+
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
104+
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)],
105+
[]),
106+
E = event_recorder:get_events(Config),
107+
ok = event_recorder:stop(Config),
108+
?assert(lists:any(fun(#event{type = node_deleted}) ->
109+
true;
110+
(_) ->
111+
false
112+
end, E)).
113+
114+
join_cluster(Node, Cluster) ->
115+
ok = rabbit_control_helper:command(stop_app, Node),
116+
ok = rabbit_control_helper:command(join_cluster, Node, [atom_to_list(Cluster)], []),
117+
rabbit_control_helper:command(start_app, Node).

0 commit comments

Comments
 (0)