Skip to content

Commit 15f208e

Browse files
committed
rabbit_db_cluster: Move generic clustering steps from rabbit_mnesia
[Why] When a single node or a cluster is initialized, we go through a few steps which are not Mnesia-specific or even related. For instance, we verify that both ends are compatible w.r.t. feature flags. When we will introduce Khepri, we will have to go through the same generic steps. Therefore, it makes sense to drive those steps from `rabbit_db_cluster` and only call into `rabbit_mnesia` when needed. [How] The generic code is moved from `rabbit_mnesia` to `rabbit_db_cluster`.
1 parent eb6327a commit 15f208e

File tree

4 files changed

+112
-47
lines changed

4 files changed

+112
-47
lines changed

deps/rabbit/src/rabbit_db.erl

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,12 @@ init_using_mnesia() ->
9191
%% @doc Resets the database and the node.
9292

9393
reset() ->
94-
reset_using_mnesia().
94+
run(
95+
#{mnesia => fun reset_using_mnesia/0}).
9596

9697
reset_using_mnesia() ->
97-
?LOG_DEBUG(
98-
"DB: resetting node",
98+
?LOG_INFO(
99+
"DB: resetting node (using Mnesia)",
99100
#{domain => ?RMQLOG_DOMAIN_DB}),
100101
rabbit_mnesia:reset().
101102

@@ -104,11 +105,12 @@ reset_using_mnesia() ->
104105
%% @doc Resets the database and the node.
105106

106107
force_reset() ->
107-
force_reset_using_mnesia().
108+
run(
109+
#{mnesia => fun force_reset_using_mnesia/0}).
108110

109111
force_reset_using_mnesia() ->
110112
?LOG_DEBUG(
111-
"DB: resetting node forcefully",
113+
"DB: resetting node forcefully (using Mnesia)",
112114
#{domain => ?RMQLOG_DOMAIN_DB}),
113115
rabbit_mnesia:force_reset().
114116

@@ -120,11 +122,12 @@ force_reset_using_mnesia() ->
120122
%% state, like if critical members are MIA.
121123

122124
force_load_on_next_boot() ->
123-
force_load_on_next_boot_using_mnesia().
125+
run(
126+
#{mnesia => fun force_load_on_next_boot_using_mnesia/0}).
124127

125128
force_load_on_next_boot_using_mnesia() ->
126129
?LOG_DEBUG(
127-
"DB: resetting node forcefully",
130+
"DB: force load on next boot (using Mnesia)",
128131
#{domain => ?RMQLOG_DOMAIN_DB}),
129132
rabbit_mnesia:force_load_next_boot().
130133

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,28 @@ ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
4545
{error, Reason} -> throw({error, {incompatible_feature_flags, Reason}})
4646
end.
4747

48+
-spec can_join(RemoteNode) -> Ret when
49+
RemoteNode :: node(),
50+
Ret :: Ok | Error,
51+
Ok :: {ok, [node()]} | {ok, already_member},
52+
Error :: {error, {inconsistent_cluster, string()}}.
53+
54+
can_join(RemoteNode) ->
55+
?LOG_INFO(
56+
"DB: checking if `~ts` can join cluster using remote node `~ts`",
57+
[node(), RemoteNode],
58+
#{domain => ?RMQLOG_DOMAIN_DB}),
59+
case rabbit_feature_flags:check_node_compatibility(RemoteNode) of
60+
ok ->
61+
rabbit_db:run(
62+
#{mnesia => fun() -> can_join_using_mnesia(RemoteNode) end});
63+
Error ->
64+
Error
65+
end.
66+
67+
can_join_using_mnesia(RemoteNode) ->
68+
rabbit_mnesia:can_join_cluster(RemoteNode).
69+
4870
-spec join(RemoteNode, NodeType) -> Ret when
4971
RemoteNode :: node(),
5072
NodeType :: rabbit_db_cluster:node_type(),
@@ -55,21 +77,41 @@ ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
5577

5678
join(RemoteNode, NodeType)
5779
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
58-
?LOG_DEBUG(
59-
"DB: joining cluster using remote node `~ts`", [RemoteNode],
60-
#{domain => ?RMQLOG_DOMAIN_DB}),
61-
join_using_mnesia(RemoteNode, NodeType).
80+
case can_join(RemoteNode) of
81+
{ok, ClusterNodes} when is_list(ClusterNodes) ->
82+
rabbit_db:reset(),
83+
84+
?LOG_INFO(
85+
"DB: joining cluster using remote nodes:~n~tp", [ClusterNodes],
86+
#{domain => ?RMQLOG_DOMAIN_DB}),
87+
Ret = rabbit_db:run(
88+
#{mnesia =>
89+
fun() -> join_using_mnesia(ClusterNodes, NodeType) end}),
90+
case Ret of
91+
ok ->
92+
rabbit_node_monitor:notify_joined_cluster(),
93+
ok;
94+
{error, _} = Error ->
95+
Error
96+
end;
97+
{ok, already_member} ->
98+
{ok, already_member};
99+
{error, _} = Error ->
100+
Error
101+
end.
62102

63-
join_using_mnesia(RemoteNode, NodeType) ->
64-
rabbit_mnesia:join_cluster(RemoteNode, NodeType).
103+
join_using_mnesia(ClusterNodes, NodeType) when is_list(ClusterNodes) ->
104+
rabbit_mnesia:join_cluster(ClusterNodes, NodeType).
65105

66106
-spec forget_member(Node, RemoveWhenOffline) -> ok when
67107
Node :: node(),
68108
RemoveWhenOffline :: boolean().
69109
%% @doc Removes `Node' from the cluster.
70110

71111
forget_member(Node, RemoveWhenOffline) ->
72-
forget_member_using_mnesia(Node, RemoveWhenOffline).
112+
rabbit_db:run(
113+
#{mnesia =>
114+
fun() -> forget_member_using_mnesia(Node, RemoveWhenOffline) end}).
73115

74116
forget_member_using_mnesia(Node, RemoveWhenOffline) ->
75117
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).
@@ -85,7 +127,8 @@ forget_member_using_mnesia(Node, RemoveWhenOffline) ->
85127
%% Node types may not all be valid with all databases.
86128

87129
change_node_type(NodeType) ->
88-
change_node_type_using_mnesia(NodeType).
130+
rabbit_db:run(
131+
#{mnesia => fun() -> change_node_type_using_mnesia(NodeType) end}).
89132

90133
change_node_type_using_mnesia(NodeType) ->
91134
rabbit_mnesia:change_cluster_node_type(NodeType).
@@ -99,7 +142,8 @@ change_node_type_using_mnesia(NodeType) ->
99142
%% @doc Indicates if this node is clustered with other nodes or not.
100143

101144
is_clustered() ->
102-
is_clustered_using_mnesia().
145+
rabbit_db:run(
146+
#{mnesia => fun is_clustered_using_mnesia/0}).
103147

104148
is_clustered_using_mnesia() ->
105149
rabbit_mnesia:is_clustered().
@@ -109,7 +153,8 @@ is_clustered_using_mnesia() ->
109153
%% @doc Returns the list of cluster members.
110154

111155
members() ->
112-
members_using_mnesia().
156+
rabbit_db:run(
157+
#{mnesia => fun members_using_mnesia/0}).
113158

114159
members_using_mnesia() ->
115160
case rabbit_mnesia:is_running() andalso rabbit_table:is_present() of
@@ -137,7 +182,8 @@ members_using_mnesia() ->
137182
%% @private
138183

139184
disc_members() ->
140-
disc_members_using_mnesia().
185+
rabbit_db:run(
186+
#{mnesia => fun disc_members_using_mnesia/0}).
141187

142188
disc_members_using_mnesia() ->
143189
rabbit_mnesia:cluster_nodes(disc).
@@ -149,7 +195,8 @@ disc_members_using_mnesia() ->
149195
%% Node types may not all be relevant with all databases.
150196

151197
node_type() ->
152-
node_type_using_mnesia().
198+
rabbit_db:run(
199+
#{mnesia => fun node_type_using_mnesia/0}).
153200

154201
node_type_using_mnesia() ->
155202
rabbit_mnesia:node_type().
@@ -177,7 +224,8 @@ check_compatibility_using_mnesia(RemoteNode) ->
177224
%% @doc Ensures the cluster is consistent.
178225

179226
check_consistency() ->
180-
check_consistency_using_mnesia().
227+
rabbit_db:run(
228+
#{mnesia => fun check_consistency_using_mnesia/0}).
181229

182230
check_consistency_using_mnesia() ->
183231
rabbit_mnesia:check_cluster_consistency().
@@ -190,7 +238,8 @@ check_consistency_using_mnesia() ->
190238
%% command.
191239

192240
cli_cluster_status() ->
193-
cli_cluster_status_using_mnesia().
241+
rabbit_db:run(
242+
#{mnesia => fun cli_cluster_status_using_mnesia/0}).
194243

195244
cli_cluster_status_using_mnesia() ->
196245
rabbit_mnesia:status().

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
-export([%% Main interface
1515
init/0,
16+
can_join_cluster/1,
1617
join_cluster/2,
1718
reset/0,
1819
force_reset/0,
@@ -153,10 +154,10 @@ create_cluster_callback(RemoteNode, NodeType) ->
153154
%% all in the same cluster, we simply pick the first online node and
154155
%% we cluster to its cluster.
155156

156-
-spec join_cluster(node(), rabbit_db_cluster:node_type())
157-
-> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
157+
-spec can_join_cluster(node())
158+
-> {ok, [node()]} | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
158159

159-
join_cluster(DiscoveryNode, NodeType) ->
160+
can_join_cluster(DiscoveryNode) ->
160161
ensure_mnesia_not_running(),
161162
ensure_mnesia_dir(),
162163
case is_only_clustered_disc_node() of
@@ -167,28 +168,8 @@ join_cluster(DiscoveryNode, NodeType) ->
167168
case rabbit_nodes:me_in_nodes(ClusterNodes) of
168169
false ->
169170
case check_cluster_consistency(DiscoveryNode, false) of
170-
{ok, _} ->
171-
%% reset the node. this simplifies things and it
172-
%% will be needed in this case - we're joining a new
173-
%% cluster with new nodes which are not in synch
174-
%% with the current node. It also lifts the burden
175-
%% of resetting the node from the user.
176-
reset_gracefully(),
177-
178-
NodeType1 = case is_node_type_permitted(NodeType) of
179-
false -> disc;
180-
true -> NodeType
181-
end,
182-
183-
%% Join the cluster
184-
rabbit_log:info("Clustering with ~tp as ~tp node",
185-
[ClusterNodes, NodeType1]),
186-
ok = init_db_with_mnesia(ClusterNodes, NodeType1,
187-
true, true, _Retry = true),
188-
rabbit_node_monitor:notify_joined_cluster(),
189-
ok;
190-
{error, Reason} ->
191-
{error, Reason}
171+
{ok, _} -> {ok, ClusterNodes};
172+
Error -> Error
192173
end;
193174
true ->
194175
%% DiscoveryNode thinks that we are part of a cluster, but
@@ -204,6 +185,38 @@ join_cluster(DiscoveryNode, NodeType) ->
204185
end
205186
end.
206187

188+
-spec join_cluster
189+
([node()], rabbit_db_cluster:node_type()) ->
190+
ok | {error, any()};
191+
(node(), rabbit_db_cluster:node_type()) ->
192+
ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
193+
194+
join_cluster(ClusterNodes, NodeType) when is_list(ClusterNodes) ->
195+
%% Join the cluster.
196+
NodeType1 = case is_node_type_permitted(NodeType) of
197+
false -> disc;
198+
true -> NodeType
199+
end,
200+
rabbit_log:info("Clustering with ~tp as ~tp node",
201+
[ClusterNodes, NodeType1]),
202+
ok = init_db_with_mnesia(ClusterNodes, NodeType1,
203+
true, true, _Retry = true),
204+
rabbit_node_monitor:notify_joined_cluster(),
205+
ok;
206+
join_cluster(DiscoveryNode, NodeType) when is_atom(DiscoveryNode) ->
207+
%% Code to remain compatible with `change_cluster_node_type/1' and older
208+
%% CLI.
209+
case can_join_cluster(DiscoveryNode) of
210+
{ok, ClusterNodes} when is_list(ClusterNodes) ->
211+
ok = reset_gracefully(),
212+
ok = join_cluster(ClusterNodes, NodeType),
213+
ok;
214+
{ok, already_member} ->
215+
{ok, already_member};
216+
Error ->
217+
Error
218+
end.
219+
207220
%% return node to its virgin state, where it is not member of any
208221
%% cluster, has no cluster configuration, no local database, and no
209222
%% persisted messages

deps/rabbitmq_cli/test/ctl/join_cluster_command_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ defmodule JoinClusterCommandTest do
9393
stop_rabbitmq_app()
9494

9595
assert match?(
96-
{:badrpc_multi, _, [_]},
96+
{:error, {:aborted_feature_flags_compat_check, {:error, {:erpc, :noconnection}}}},
9797
@command.run([:jake@thedog], context[:opts])
9898
)
9999

0 commit comments

Comments
 (0)