Skip to content

Commit 35c99b3

Browse files
committed
Revert "Khepri: replace join_cluster by mnesia_to_khepri:sync_cluster_membership"
This reverts commit 7ec068c.
1 parent 5be9cd0 commit 35c99b3

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ join_using_khepri(RemoteNode, NodeType) ->
7676
ok ->
7777
case join_using_mnesia(RemoteNode, NodeType) of
7878
ok ->
79-
rabbit_khepri:init_cluster();
79+
rabbit_khepri:join_cluster(RemoteNode);
8080
{ok, already_member} ->
81-
rabbit_khepri:init_cluster();
81+
rabbit_khepri:join_cluster(RemoteNode);
8282
Error ->
8383
Error
8484
end;

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@
7171
-export([init_cluster/0]).
7272
-export([do_join/1]).
7373
%% To add the current node to an existing cluster
74-
-export([leave_cluster/1]).
74+
-export([check_join_cluster/1,
75+
join_cluster/1,
76+
leave_cluster/1]).
7577
-export([is_clustered/0]).
7678
-export([check_cluster_consistency/0,
7779
check_cluster_consistency/2,
@@ -433,6 +435,73 @@ init_cluster() ->
433435
_ = application:ensure_all_started(khepri_mnesia_migration),
434436
mnesia_to_khepri:sync_cluster_membership(?STORE_ID).
435437

438+
%%%%%%%%
439+
%% TODO run_peer_discovery!!
440+
%%%%%%%
441+
442+
check_join_cluster(DiscoveryNode) ->
443+
{ClusterNodes, _} = discover_cluster([DiscoveryNode]),
444+
case me_in_nodes(ClusterNodes) of
445+
false ->
446+
case check_cluster_consistency(DiscoveryNode, false) of
447+
{ok, _S} ->
448+
ok;
449+
Error ->
450+
Error
451+
end;
452+
true ->
453+
%% DiscoveryNode thinks that we are part of a cluster, but
454+
%% do we think so ourselves?
455+
case are_we_clustered_with(DiscoveryNode) of
456+
true ->
457+
rabbit_log:info("Asked to join a cluster but already a member of it: ~tp", [ClusterNodes]),
458+
{ok, already_member};
459+
false ->
460+
Msg = format_inconsistent_cluster_message(DiscoveryNode, node()),
461+
rabbit_log:error(Msg),
462+
{error, {inconsistent_cluster, Msg}}
463+
end
464+
end.
465+
466+
join_cluster(DiscoveryNode) ->
467+
{ClusterNodes, _} = discover_cluster([DiscoveryNode]),
468+
case me_in_nodes(ClusterNodes) of
469+
false ->
470+
case check_cluster_consistency(DiscoveryNode, false) of
471+
{ok, _S} ->
472+
ThisNode = node(),
473+
retry_khepri_op(fun() -> add_member(ThisNode, [DiscoveryNode]) end, 60);
474+
Error ->
475+
Error
476+
end;
477+
true ->
478+
%% DiscoveryNode thinks that we are part of a cluster, but
479+
%% do we think so ourselves?
480+
case are_we_clustered_with(DiscoveryNode) of
481+
true ->
482+
rabbit_log:info("Asked to join a cluster but already a member of it: ~tp", [ClusterNodes]),
483+
{ok, already_member};
484+
false ->
485+
Msg = format_inconsistent_cluster_message(DiscoveryNode, node()),
486+
rabbit_log:error(Msg),
487+
{error, {inconsistent_cluster, Msg}}
488+
end
489+
end.
490+
491+
discover_cluster(Nodes) ->
492+
case lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
493+
(Node, _) -> discover_cluster0(Node)
494+
end, {error, no_nodes_provided}, Nodes) of
495+
{ok, Res} -> Res;
496+
{error, E} -> throw({error, E});
497+
{badrpc, Reason} -> throw({badrpc_multi, Reason, Nodes})
498+
end.
499+
500+
discover_cluster0(Node) when Node == node() ->
501+
{error, cannot_cluster_node_with_itself};
502+
discover_cluster0(Node) ->
503+
rpc:call(Node, ?MODULE, cluster_status_from_khepri, []).
504+
436505
leave_cluster(Node) ->
437506
retry_khepri_op(fun() -> remove_member(Node) end, 60).
438507

@@ -533,6 +602,13 @@ format_inconsistent_cluster_message(Thinker, Dissident) ->
533602

534603
me_in_nodes(Nodes) -> lists:member(node(), Nodes).
535604

605+
are_we_clustered_with(Node) ->
606+
%% Khepri is stopped at this point, let's ask rabbit_node_monitor
607+
%% We're going to fail to join anyway, but for the user is not the same
608+
%% to return 'already a member' than 'inconsistent cluster'.
609+
{AllNodes, _DiscNodes, _RunningNodes} = rabbit_node_monitor:read_cluster_status(),
610+
lists:member(Node, AllNodes).
611+
536612
node_info() ->
537613
{rabbit_misc:otp_release(), rabbit_misc:version(), cluster_status_from_khepri()}.
538614

0 commit comments

Comments
 (0)