Skip to content

Commit cac5143

Browse files
committed
Khepri: replace join_cluster by mnesia_to_khepri:sync_cluster_membership
1 parent 9062aa7 commit cac5143

File tree

3 files changed

+26
-33
lines changed

3 files changed

+26
-33
lines changed

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ join_using_khepri(RemoteNode, NodeType) ->
7676
ok ->
7777
case join_using_mnesia(RemoteNode, NodeType) of
7878
ok ->
79-
rabbit_khepri:join_cluster(RemoteNode);
79+
rabbit_khepri:init_cluster();
8080
{ok, already_member} ->
81-
rabbit_khepri:join_cluster(RemoteNode);
81+
rabbit_khepri:init_cluster();
8282
Error ->
8383
Error
8484
end;

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
-export([do_join/1]).
7373
%% To add the current node to an existing cluster
7474
-export([check_join_cluster/1,
75-
join_cluster/1,
7675
leave_cluster/1]).
7776
-export([is_clustered/0]).
7877
-export([check_cluster_consistency/0,
@@ -435,10 +434,26 @@ cli_cluster_status() ->
435434
init_cluster() ->
436435
%% Ensure the local Khepri store is running before we can join it. It
437436
%% could be stopped if RabbitMQ is not running for instance.
438-
ok = setup(),
439-
khepri:info(?RA_CLUSTER_NAME),
440-
_ = application:ensure_all_started(khepri_mnesia_migration),
441-
mnesia_to_khepri:sync_cluster_membership(?STORE_ID).
437+
rabbit_log:debug("Khepri clustering: starting Mnesia..."),
438+
IsRunning = rabbit_mnesia:is_running(),
439+
try
440+
case IsRunning of
441+
true -> ok;
442+
false -> rabbit_mnesia:start_mnesia(false)
443+
end,
444+
rabbit_log:debug("Khepri clustering: starting Khepri..."),
445+
ok = setup(),
446+
khepri:info(?RA_CLUSTER_NAME),
447+
rabbit_log:debug("Khepri clustering: starting khepri_mnesia_migration..."),
448+
_ = application:ensure_all_started(khepri_mnesia_migration),
449+
rabbit_log:debug("Khepri clustering: syncing cluster membership"),
450+
mnesia_to_khepri:sync_cluster_membership(?STORE_ID)
451+
after
452+
case IsRunning of
453+
true -> ok;
454+
false -> rabbit_mnesia:stop_mnesia()
455+
end
456+
end.
442457

443458
%%%%%%%%
444459
%% TODO run_peer_discovery!!
@@ -468,31 +483,6 @@ check_join_cluster(DiscoveryNode) ->
468483
end
469484
end.
470485

471-
join_cluster(DiscoveryNode) ->
472-
{ClusterNodes, _} = discover_cluster([DiscoveryNode]),
473-
case me_in_nodes(ClusterNodes) of
474-
false ->
475-
case check_cluster_consistency(DiscoveryNode, false) of
476-
{ok, _S} ->
477-
ThisNode = node(),
478-
retry_khepri_op(fun() -> add_member(ThisNode, [DiscoveryNode]) end, 60);
479-
Error ->
480-
Error
481-
end;
482-
true ->
483-
%% DiscoveryNode thinks that we are part of a cluster, but
484-
%% do we think so ourselves?
485-
case are_we_clustered_with(DiscoveryNode) of
486-
true ->
487-
rabbit_log:info("Asked to join a cluster but already a member of it: ~tp", [ClusterNodes]),
488-
{ok, already_member};
489-
false ->
490-
Msg = format_inconsistent_cluster_message(DiscoveryNode, node()),
491-
rabbit_log:error(Msg),
492-
{error, {inconsistent_cluster, Msg}}
493-
end
494-
end.
495-
496486
discover_cluster(Nodes) ->
497487
case lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
498488
(Node, _) -> discover_cluster0(Node)

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@
4141
on_node_down/1,
4242

4343
%% Helpers for diagnostics commands
44-
schema_info/1
44+
schema_info/1,
45+
46+
start_mnesia/1,
47+
stop_mnesia/0
4548
]).
4649

4750
%% Mnesia queries

0 commit comments

Comments
 (0)