Skip to content

rabbit_node_monitor: use a leader query for cluster members on node_added event (backport #11664) #11835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ rabbitmq_integration_suite(
size = "medium",
)

rabbitmq_integration_suite(
name = "clustering_events_SUITE",
additional_beam = [
":test_event_recorder_beam",
],
size = "medium",
)

rabbitmq_integration_suite(
name = "quorum_queue_member_reconciliation_SUITE",
size = "medium",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "clustering_events_SUITE_beam_files",
testonly = True,
srcs = ["test/clustering_events_SUITE.erl"],
outs = ["test/clustering_events_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)

erlang_bytecode(
name = "clustering_management_SUITE_beam_files",
Expand Down
24 changes: 24 additions & 0 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-export([change_node_type/1]).
-export([is_clustered/0,
members/0,
consistent_members/0,
disc_members/0,
node_type/0,
check_compatibility/1,
Expand Down Expand Up @@ -150,6 +151,8 @@ join(RemoteNode, NodeType)
false -> ok = rabbit_mnesia:reset_gracefully()
end,

ok = rabbit_node_monitor:notify_left_cluster(node()),

%% Now that the files are all gone after the reset above, restart
%% the Ra systems. They will recreate their folder in the process.
case RestartRabbit of
Expand Down Expand Up @@ -224,6 +227,14 @@ join_using_khepri(_ClusterNodes, ram = NodeType) ->
%% @doc Removes `Node' from the cluster.

forget_member(Node, RemoveWhenOffline) ->
case forget_member0(Node, RemoveWhenOffline) of
ok ->
rabbit_node_monitor:notify_left_cluster(Node);
Error ->
Error
end.

forget_member0(Node, RemoveWhenOffline) ->
case rabbit:is_running(Node) of
false ->
?LOG_DEBUG(
Expand Down Expand Up @@ -306,6 +317,19 @@ members_using_khepri() ->
%% so we need to allow callers to be more defensive in this case.
rabbit_khepri:locally_known_nodes().

-spec consistent_members() -> Members when
Members :: [node()].
%% @doc Returns the list of cluster members.

consistent_members() ->
case rabbit_khepri:get_feature_state() of
enabled -> consistent_members_using_khepri();
_ -> members_using_mnesia()
end.

consistent_members_using_khepri() ->
rabbit_khepri:nodes().

-spec disc_members() -> Members when
Members :: [node()].
%% @private
Expand Down
10 changes: 2 additions & 8 deletions deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,6 @@ change_cluster_node_type(Type) ->
-spec forget_cluster_node(node(), boolean()) -> 'ok'.

forget_cluster_node(Node, RemoveWhenOffline) ->
forget_cluster_node(Node, RemoveWhenOffline, true).

forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) ->
case lists:member(Node, cluster_nodes(all)) of
true -> ok;
false -> e(not_a_cluster_node)
Expand All @@ -307,9 +304,6 @@ forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) ->
{false, true} -> rabbit_log:info(
"Removing node ~tp from cluster", [Node]),
case remove_node_if_mnesia_running(Node) of
ok when EmitNodeDeletedEvent ->
rabbit_event:notify(node_deleted, [{node, Node}]),
ok;
ok -> ok;
{error, _} = Err -> throw(Err)
end
Expand All @@ -333,7 +327,7 @@ remove_node_offline_node(Node) ->
%% We skip the 'node_deleted' event because the
%% application is stopped and thus, rabbit_event is not
%% enabled.
forget_cluster_node(Node, false, false),
forget_cluster_node(Node, false),
force_load_next_boot()
after
stop_mnesia()
Expand Down Expand Up @@ -892,8 +886,8 @@ remove_node_if_mnesia_running(Node) ->
%% change being propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
rabbit_amqqueue:forget_all_durable(Node),
rabbit_node_monitor:notify_left_cluster(Node),
rabbit_amqqueue:forget_all_durable(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ notify_node_up() ->

notify_joined_cluster() ->
NewMember = node(),
Nodes = alive_rabbit_nodes() -- [NewMember],
Nodes = alive_rabbit_nodes(rabbit_nodes:list_consistent_members()) -- [NewMember],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_db_cluster:node_type()}),

Expand Down Expand Up @@ -620,6 +620,7 @@ handle_cast({left_cluster, Node}, State) ->
{del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)})
end,
rabbit_event:notify(node_deleted, [{node, Node}]),
{noreply, State};

handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
Expand Down
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_nodes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
is_running/2, is_process_running/2,
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
all_running/0,
is_member/1, list_members/0,
is_member/1, list_members/0, list_consistent_members/0,
filter_members/1,
is_reachable/1, list_reachable/0, list_unreachable/0,
filter_reachable/1, filter_unreachable/1,
Expand Down Expand Up @@ -182,6 +182,14 @@ is_member(Node) when is_atom(Node) ->
list_members() ->
rabbit_db_cluster:members().

-spec list_consistent_members() -> Nodes when
Nodes :: [node()].
%% @doc Returns the list of nodes in the cluster as reported by the leader.
%%

list_consistent_members() ->
rabbit_db_cluster:consistent_members().

-spec filter_members(Nodes) -> Nodes when
Nodes :: [node()].
%% @doc Filters the given list of nodes to only select those belonging to the
Expand Down
117 changes: 117 additions & 0 deletions deps/rabbit/test/clustering_events_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(clustering_events_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-import(rabbit_ct_helpers, [eventually/3]).
-import(event_recorder,
[assert_event_type/2,
assert_event_prop/2]).

-compile(export_all).

all() ->
[
{group, tests}
].

groups() ->
[
{tests, [], [
node_added_event,
node_deleted_event
]}
].

%% -------------------------------------------------------------------
%% Per Suite
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

%%
%% Per Group
%%

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

%%
%% Per Test Case
%%
init_per_testcase(node_added_event = TestCase, Config) ->
Config1 = configure_cluster_essentials(Config, TestCase, false),
Config2 = rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_helpers:testcase_started(Config2, TestCase);
init_per_testcase(node_deleted_event = TestCase, Config) ->
Config1 = configure_cluster_essentials(Config, TestCase, true),
Config2 = rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_helpers:testcase_started(Config2, TestCase).

end_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, TestCase).

%%
%% Helpers
%%
configure_cluster_essentials(Config, Group, Clustered) ->
rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 3},
{rmq_nodes_clustered, Clustered}
]).

node_added_event(Config) ->
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = event_recorder:start(Config),
join_cluster(Server2, Server1),
E = event_recorder:get_events(Config),
ok = event_recorder:stop(Config),
?assert(lists:any(fun(#event{type = node_added}) ->
true;
(_) ->
false
end, E)).

node_deleted_event(Config) ->
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = event_recorder:start(Config),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)],
[]),
E = event_recorder:get_events(Config),
ok = event_recorder:stop(Config),
?assert(lists:any(fun(#event{type = node_deleted}) ->
true;
(_) ->
false
end, E)).

join_cluster(Node, Cluster) ->
ok = rabbit_control_helper:command(stop_app, Node),
ok = rabbit_control_helper:command(join_cluster, Node, [atom_to_list(Cluster)], []),
rabbit_control_helper:command(start_app, Node).
Loading