Skip to content

Commit 7898ecf

Browse files
committed
rabbit_khepri: Improve compatibility with rabbit_mnesia's behavior
1 parent 66bd349 commit 7898ecf

File tree

1 file changed

+44
-26
lines changed

1 file changed

+44
-26
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ add_member(JoiningNode, [_ | _] = Cluster) ->
121121
?LOG_DEBUG(
122122
"Khepri clustering: Node ~p is already a member of cluster ~p",
123123
[JoiningNode, Cluster]),
124-
ok
124+
{ok, already_member}
125125
end.
126126

127127
pick_node_in_cluster(Cluster) when is_list(Cluster) ->
@@ -146,36 +146,49 @@ do_join(RemoteNode) when RemoteNode =/= node() ->
146146
ok = setup(),
147147
khepri:info(?RA_CLUSTER_NAME),
148148

149-
%% We don't verify the cluster membership before adding this node to the
150-
%% remote cluster because such a check would not be atomic: the membership
151-
%% could well change between the check and the actual join.
152-
153-
?LOG_DEBUG(
154-
"Adding this node (~p) to Khepri cluster \"~s\" through "
155-
"node ~p",
156-
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
157-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
158-
159149
%% Ensure the remote node is reachable before we add it.
160150
pong = net_adm:ping(RemoteNode),
161151

162-
%% If the remote node to add is running RabbitMQ, we need to put
163-
%% it in maintenance mode at least. We remember that state to
164-
%% revive the node only if it was fully running before this code.
165-
IsRunning = rabbit:is_running(ThisNode),
166-
AlreadyBeingDrained =
167-
rabbit_maintenance:is_being_drained_consistent_read(ThisNode),
168-
NeedToRevive = IsRunning andalso not AlreadyBeingDrained,
169-
maybe_drain_node(IsRunning),
152+
%% We verify the cluster membership before adding `ThisNode' to
153+
%% `RemoteNode''s cluster. We do it mostly to keep the same behavior as
154+
%% what we do with Mnesia. Otherwise, the interest is limited given the
155+
%% check and the actual join are not atomic.
156+
157+
ClusteredNodes = rabbit_misc:rpc_call(
158+
RemoteNode, rabbit_khepri, locally_known_nodes, []),
159+
case lists:member(ThisNode, ClusteredNodes) of
160+
false ->
161+
?LOG_DEBUG(
162+
"Adding this node (~p) to Khepri cluster \"~s\" through "
163+
"node ~p",
164+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
165+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
166+
167+
%% If the remote node to add is running RabbitMQ, we need to put it
168+
%% in maintenance mode at least. We remember that state to revive
169+
%% the node only if it was fully running before this code.
170+
IsRunning = rabbit:is_running(ThisNode),
171+
AlreadyBeingDrained =
172+
rabbit_maintenance:is_being_drained_consistent_read(ThisNode),
173+
NeedToRevive = IsRunning andalso not AlreadyBeingDrained,
174+
maybe_drain_node(IsRunning),
170175

171-
%% Joining a cluster includes a reset of the local Khepri store.
172-
Ret = khepri_cluster:join(?RA_CLUSTER_NAME, RemoteNode),
176+
%% Joining a cluster includes a reset of the local Khepri store.
177+
Ret = khepri_cluster:join(?RA_CLUSTER_NAME, RemoteNode),
173178

174-
%% Revive the remote node if it was running and not under
175-
%% maintenance before we changed the cluster membership.
176-
maybe_revive_node(NeedToRevive),
179+
%% Revive the remote node if it was running and not under
180+
%% maintenance before we changed the cluster membership.
181+
maybe_revive_node(NeedToRevive),
177182

178-
Ret.
183+
Ret;
184+
true ->
185+
?LOG_DEBUG(
186+
"This node (~p) is already part of the Khepri cluster \"~s\" "
187+
"like node ~p",
188+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
189+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
190+
{ok, already_member}
191+
end.
179192

180193
maybe_drain_node(true) ->
181194
ok = rabbit_maintenance:drain();
@@ -238,7 +251,12 @@ remove_member(NodeToRemove) when NodeToRemove =/= node() ->
238251
"Removing remote node ~s from Khepri cluster \"~s\"",
239252
[NodeToRemove, ?RA_CLUSTER_NAME],
240253
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
241-
ok = ensure_ra_system_started(),
254+
255+
%% We need the Khepri store to run on the node to remove, to be
256+
%% able to reset it.
257+
ok = rabbit_misc:rpc_call(
258+
NodeToRemove, ?MODULE, setup, []),
259+
242260
Ret = rabbit_misc:rpc_call(
243261
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
244262
case Ret of

0 commit comments

Comments
 (0)