Skip to content

Commit 01000ea

Browse files
committed
rabbit_khepri: Adapt to new clustering API
1 parent b0a9324 commit 01000ea

File tree

2 files changed

+62
-91
lines changed

2 files changed

+62
-91
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,15 @@ expand_khepri_cluster(FeatureName, AllMnesiaNodes) ->
433433
%% therefore, we skip this cluster to consider the following one,
434434
%% [Node2].
435435
KhepriCluster = find_largest_khepri_cluster(FeatureName),
436-
add_nodes_to_khepri_cluster(FeatureName, KhepriCluster, AllMnesiaNodes).
436+
NodesToAdd = AllMnesiaNodes -- KhepriCluster,
437+
?LOG_DEBUG(
438+
"Feature flags `~s`: selected Khepri cluster: ~p",
439+
[FeatureName, KhepriCluster]),
440+
?LOG_DEBUG(
441+
"Feature flags `~s`: Mnesia nodes to add to the Khepri cluster "
442+
"above: ~p",
443+
[FeatureName, NodesToAdd]),
444+
add_nodes_to_khepri_cluster(FeatureName, KhepriCluster, NodesToAdd).
437445

438446
add_nodes_to_khepri_cluster(FeatureName, KhepriCluster, [Node | Rest]) ->
439447
add_node_to_khepri_cluster(FeatureName, KhepriCluster, Node),

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 53 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@
5454
is_enabled/1,
5555
nodes_if_khepri_enabled/0,
5656
try_mnesia_or_khepri/2]).
57-
-export([do_add_member/1,
58-
priv_reset/0]).
57+
-export([do_join/1]).
5958

6059
-ifdef(TEST).
6160
-export([force_metadata_store/1,
@@ -85,7 +84,9 @@ setup() ->
8584
setup(_) ->
8685
?LOG_DEBUG("Starting Khepri-based " ?RA_FRIENDLY_NAME),
8786
ok = ensure_ra_system_started(),
88-
case khepri:start(?RA_SYSTEM, ?RA_CLUSTER_NAME, ?RA_FRIENDLY_NAME) of
87+
RaServerConfig = #{cluster_name => ?RA_CLUSTER_NAME,
88+
friendly_name => ?RA_FRIENDLY_NAME},
89+
case khepri:start(?RA_SYSTEM, RaServerConfig) of
8990
{ok, ?STORE_ID} ->
9091
?LOG_DEBUG(
9192
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
@@ -125,12 +126,13 @@ wait_for_leader(Fun, Timeout) ->
125126
Error
126127
end.
127128

128-
add_member(JoiningNode, JoinedNode) when JoinedNode =:= node() ->
129-
Ret = do_add_member(JoiningNode),
129+
add_member(JoiningNode, JoinedNode)
130+
when JoiningNode =:= node() andalso is_atom(JoinedNode) ->
131+
Ret = do_join(JoinedNode),
130132
post_add_member(JoiningNode, JoinedNode, Ret);
131133
add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) ->
132134
Ret = rabbit_misc:rpc_call(
133-
JoinedNode, rabbit_khepri, do_add_member, [JoiningNode]),
135+
JoiningNode, rabbit_khepri, do_join, [JoinedNode]),
134136
post_add_member(JoiningNode, JoinedNode, Ret);
135137
add_member(JoiningNode, [_ | _] = Cluster) ->
136138
case lists:member(JoiningNode, Cluster) of
@@ -158,79 +160,59 @@ pick_node_in_cluster(Cluster) when is_list(Cluster) ->
158160
false -> hd(Cluster)
159161
end.
160162

161-
do_add_member(NewNode) when NewNode =/= node() ->
163+
do_join(RemoteNode) when RemoteNode =/= node() ->
164+
ThisNode = node(),
165+
162166
?LOG_DEBUG(
163-
"Khepri clustering: Trying to add node ~p to cluster \"~s\" through "
164-
"node ~p",
165-
[NewNode, ?RA_CLUSTER_NAME, node()],
167+
"Khepri clustering: Trying to add this node (~p) to cluster \"~s\" "
168+
"through node ~p",
169+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
166170
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
167171

168-
%% Check if the node is already part of the cluster. We query the local Ra
169-
%% server only, in case the cluster can't elect a leader right now.
170-
CurrentNodes = locally_known_nodes(),
171-
case lists:member(NewNode, CurrentNodes) of
172-
false ->
173-
%% Ensure the remote node is reachable before we add it.
174-
pong = net_adm:ping(NewNode),
172+
%% Ensure the local Khepri store is running before we can reset it. It
173+
%% could be stopped if RabbitMQ is not running for instance.
174+
ok = setup(),
175+
khepri:info(?RA_CLUSTER_NAME),
175176

176-
?LOG_DEBUG(
177-
"Khepri clustering: Resetting Khepri on remote node ~p",
178-
[NewNode],
179-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
177+
%% We don't verify the cluster membership before adding this node to the
178+
%% remote cluster because such a check would not be atomic: the membership
179+
%% could well change between the check and the actual join.
180180

181-
%% If the remote node to add is running RabbitMQ, we need to put
182-
%% it in maintenance mode at least. We remember that state to
183-
%% revive the node only if it was fully running before this code.
184-
RemoteIsRunning = rabbit:is_running(NewNode),
185-
RemoteAlreadyBeingDrained =
186-
rabbit_maintenance:is_being_drained_consistent_read(NewNode),
187-
NeedToReviveRemote =
188-
RemoteIsRunning andalso not RemoteAlreadyBeingDrained,
189-
maybe_drain_node(NewNode, RemoteIsRunning),
190-
191-
Ret1 = rabbit_misc:rpc_call(
192-
NewNode, rabbit_khepri, priv_reset, []),
193-
case Ret1 of
194-
ok ->
195-
?LOG_DEBUG(
196-
"Adding remote node ~s to Khepri cluster \"~s\"",
197-
[NewNode, ?RA_CLUSTER_NAME],
198-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
199-
ok = ensure_ra_system_started(),
200-
Ret2 = khepri_cluster:add_member(
201-
?RA_SYSTEM, ?RA_CLUSTER_NAME, ?RA_FRIENDLY_NAME,
202-
NewNode),
181+
?LOG_DEBUG(
182+
"Adding this node (~p) to Khepri cluster \"~s\" through "
183+
"node ~p",
184+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
185+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
203186

204-
%% Revive the remote node if it was running and not under
205-
%% maintenance before we changed the cluster membership.
206-
maybe_revive_node(NewNode, NeedToReviveRemote),
187+
%% Ensure the remote node is reachable before we add it.
188+
pong = net_adm:ping(RemoteNode),
207189

208-
Ret2;
209-
Error ->
210-
?LOG_ERROR(
211-
"Khepri clustering: Failed to reset Khepri on node "
212-
"~p: ~p",
213-
[NewNode, Error],
214-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
190+
%% If the remote node to add is running RabbitMQ, we need to put
191+
%% it in maintenance mode at least. We remember that state to
192+
%% revive the node only if it was fully running before this code.
193+
IsRunning = rabbit:is_running(ThisNode),
194+
AlreadyBeingDrained =
195+
rabbit_maintenance:is_being_drained_consistent_read(ThisNode),
196+
NeedToRevive = IsRunning andalso not AlreadyBeingDrained,
197+
maybe_drain_node(IsRunning),
215198

216-
%% Revive the remote node if it was running and not under
217-
%% maintenance before we changed the cluster membership.
218-
maybe_revive_node(NewNode, NeedToReviveRemote),
199+
%% Joining a cluster includes a reset of the local Khepri store.
200+
Ret = khepri_cluster:join(?RA_CLUSTER_NAME, RemoteNode),
219201

220-
Error
221-
end;
222-
true ->
223-
{error, {already_member, CurrentNodes}}
224-
end.
202+
%% Revive the remote node if it was running and not under
203+
%% maintenance before we changed the cluster membership.
204+
maybe_revive_node(NeedToRevive),
205+
206+
Ret.
225207

226-
maybe_drain_node(Node, true) ->
227-
ok = rabbit_misc:rpc_call(Node, rabbit_maintenance, drain, []);
228-
maybe_drain_node(_Node, false) ->
208+
maybe_drain_node(true) ->
209+
ok = rabbit_maintenance:drain();
210+
maybe_drain_node(false) ->
229211
ok.
230212

231-
maybe_revive_node(Node, true) ->
232-
ok = rabbit_misc:rpc_call(Node, rabbit_maintenance, revive, []);
233-
maybe_revive_node(_Node, false) ->
213+
maybe_revive_node(true) ->
214+
ok = rabbit_maintenance:revive();
215+
maybe_revive_node(false) ->
234216
ok.
235217

236218
post_add_member(JoiningNode, JoinedNode, ok) ->
@@ -250,9 +232,7 @@ post_add_member(
250232
{ok, already_member};
251233
post_add_member(
252234
JoiningNode, JoinedNode,
253-
{badrpc, {'EXIT', {undef, [{rabbit_khepri, Function, _, _}]}}} = Error)
254-
when Function =:= do_add_member orelse
255-
Function =:= priv_reset ->
235+
{badrpc, {'EXIT', {undef, [{rabbit_khepri, do_join, _, _}]}}} = Error) ->
256236
?LOG_INFO(
257237
"Khepri clustering: Can't add node ~p to cluster \"~s\"; "
258238
"Khepri unavailable on node ~p: ~p",
@@ -287,18 +267,16 @@ remove_member(NodeToRemove) when NodeToRemove =/= node() ->
287267
[NodeToRemove, ?RA_CLUSTER_NAME],
288268
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
289269
ok = ensure_ra_system_started(),
290-
Ret = khepri_cluster:remove_member(?RA_CLUSTER_NAME, NodeToRemove),
291-
%% FIXME: Stop the Ra server? Apparently it's still running after
292-
%% calling this function and answers queries (like the list of
293-
%% locally known members).
270+
Ret = rabbit_misc:rpc_call(
271+
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
294272
case Ret of
295273
ok ->
296274
?LOG_DEBUG(
297275
"Node ~s removed from Khepri cluster \"~s\"",
298276
[NodeToRemove, ?RA_CLUSTER_NAME],
299277
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
300278
ok;
301-
{error, _} = Error ->
279+
Error ->
302280
?LOG_ERROR(
303281
"Failed to remove remote node ~s from Khepri "
304282
"cluster \"~s\": ~p",
@@ -315,21 +293,6 @@ remove_member(NodeToRemove) when NodeToRemove =/= node() ->
315293
ok
316294
end.
317295

318-
priv_reset() ->
319-
%% To reset the content of Khepri, we need RabbitMQ to be under
320-
%% maintenance or stopped. The reason is we can't let RabbitMQ do any
321-
%% reads from or writes to Khepri because the content will be garbage.
322-
%%
323-
%% Stopping RabbitMQ would be done by the `stop_app' CLI command.
324-
%%
325-
%% If RabbitMQ is running, the node is put under maintenance by the
326-
%% `add_member/1' function above.
327-
?assert(
328-
not rabbit:is_running() orelse
329-
rabbit_maintenance:is_being_drained_consistent_read(node())),
330-
ok = ensure_ra_system_started(),
331-
ok = khepri:reset(?RA_SYSTEM, ?RA_CLUSTER_NAME).
332-
333296
ensure_ra_system_started() ->
334297
{ok, _} = application:ensure_all_started(khepri),
335298
ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM).

0 commit comments

Comments
 (0)