Skip to content

Commit a4eb89f

Browse files
committed
rabbit_khepri: Improve compatibility with rabbit_mnesia's behavior
1 parent 33e4648 commit a4eb89f

File tree

2 files changed

+89
-52
lines changed

2 files changed

+89
-52
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ add_member(JoiningNode, [_ | _] = Cluster) ->
121121
?LOG_DEBUG(
122122
"Khepri clustering: Node ~p is already a member of cluster ~p",
123123
[JoiningNode, Cluster]),
124-
ok
124+
{ok, already_member}
125125
end.
126126

127127
pick_node_in_cluster(Cluster) when is_list(Cluster) ->
@@ -146,36 +146,49 @@ do_join(RemoteNode) when RemoteNode =/= node() ->
146146
ok = setup(),
147147
khepri:info(?RA_CLUSTER_NAME),
148148

149-
%% We don't verify the cluster membership before adding this node to the
150-
%% remote cluster because such a check would not be atomic: the membership
151-
%% could well change between the check and the actual join.
152-
153-
?LOG_DEBUG(
154-
"Adding this node (~p) to Khepri cluster \"~s\" through "
155-
"node ~p",
156-
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
157-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
158-
159149
%% Ensure the remote node is reachable before we add it.
160150
pong = net_adm:ping(RemoteNode),
161151

162-
%% If the remote node to add is running RabbitMQ, we need to put
163-
%% it in maintenance mode at least. We remember that state to
164-
%% revive the node only if it was fully running before this code.
165-
IsRunning = rabbit:is_running(ThisNode),
166-
AlreadyBeingDrained =
167-
rabbit_maintenance:is_being_drained_consistent_read(ThisNode),
168-
NeedToRevive = IsRunning andalso not AlreadyBeingDrained,
169-
maybe_drain_node(IsRunning),
152+
%% We verify the cluster membership before adding `ThisNode' to
153+
%% `RemoteNode''s cluster. We do it mostly to keep the same behavior as
154+
%% what we do with Mnesia. Otherwise, the interest is limited given the
155+
%% check and the actual join are not atomic.
156+
157+
ClusteredNodes = rabbit_misc:rpc_call(
158+
RemoteNode, rabbit_khepri, locally_known_nodes, []),
159+
case lists:member(ThisNode, ClusteredNodes) of
160+
false ->
161+
?LOG_DEBUG(
162+
"Adding this node (~p) to Khepri cluster \"~s\" through "
163+
"node ~p",
164+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
165+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
166+
167+
%% If the remote node to add is running RabbitMQ, we need to put it
168+
%% in maintenance mode at least. We remember that state to revive
169+
%% the node only if it was fully running before this code.
170+
IsRunning = rabbit:is_running(ThisNode),
171+
AlreadyBeingDrained =
172+
rabbit_maintenance:is_being_drained_consistent_read(ThisNode),
173+
NeedToRevive = IsRunning andalso not AlreadyBeingDrained,
174+
maybe_drain_node(IsRunning),
170175

171-
%% Joining a cluster includes a reset of the local Khepri store.
172-
Ret = khepri_cluster:join(?RA_CLUSTER_NAME, RemoteNode),
176+
%% Joining a cluster includes a reset of the local Khepri store.
177+
Ret = khepri_cluster:join(?RA_CLUSTER_NAME, RemoteNode),
173178

174-
%% Revive the remote node if it was running and not under
175-
%% maintenance before we changed the cluster membership.
176-
maybe_revive_node(NeedToRevive),
179+
%% Revive the remote node if it was running and not under
180+
%% maintenance before we changed the cluster membership.
181+
maybe_revive_node(NeedToRevive),
177182

178-
Ret.
183+
Ret;
184+
true ->
185+
?LOG_DEBUG(
186+
"This node (~p) is already part of the Khepri cluster \"~s\" "
187+
"like node ~p",
188+
[ThisNode, ?RA_CLUSTER_NAME, RemoteNode],
189+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
190+
{ok, already_member}
191+
end.
179192

180193
maybe_drain_node(true) ->
181194
ok = rabbit_maintenance:drain();
@@ -238,7 +251,12 @@ remove_member(NodeToRemove) when NodeToRemove =/= node() ->
238251
"Removing remote node ~s from Khepri cluster \"~s\"",
239252
[NodeToRemove, ?RA_CLUSTER_NAME],
240253
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
241-
ok = ensure_ra_system_started(),
254+
255+
%% We need the Khepri store to run on the node to remove, to be
256+
%% able to reset it.
257+
ok = rabbit_misc:rpc_call(
258+
NodeToRemove, ?MODULE, setup, []),
259+
242260
Ret = rabbit_misc:rpc_call(
243261
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
244262
case Ret of

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,8 @@ stop_rabbitmq_nodes(Config) ->
10011001
fun(NodeConfig) ->
10021002
stop_rabbitmq_node(Config, NodeConfig)
10031003
end),
1004-
find_crashes_in_logs(NodeConfigs),
1004+
IgnoredCrashes = ["** force_vhost_failure"],
1005+
find_crashes_in_logs(NodeConfigs, IgnoredCrashes),
10051006
proplists:delete(rmq_nodes, Config).
10061007

10071008
stop_rabbitmq_node(Config, NodeConfig) ->
@@ -1022,68 +1023,83 @@ stop_rabbitmq_node(Config, NodeConfig) ->
10221023
end,
10231024
NodeConfig.
10241025

1025-
find_crashes_in_logs(NodeConfigs) ->
1026+
find_crashes_in_logs(NodeConfigs, IgnoredCrashes) ->
10261027
ct:pal(
10271028
"Looking up any crash reports in the nodes' log files. If we find "
10281029
"some, they will appear below:"),
10291030
CrashesCount = lists:foldl(
10301031
fun(NodeConfig, Total) ->
1031-
Count = count_crashes_in_logs(NodeConfig),
1032+
Count = count_crashes_in_logs(
1033+
NodeConfig, IgnoredCrashes),
10321034
Total + Count
10331035
end, 0, NodeConfigs),
10341036
ct:pal("Found ~b crash report(s)", [CrashesCount]),
10351037
?assertEqual(0, CrashesCount).
10361038

1037-
count_crashes_in_logs(NodeConfig) ->
1039+
count_crashes_in_logs(NodeConfig, IgnoredCrashes) ->
10381040
LogLocations = ?config(log_locations, NodeConfig),
10391041
lists:foldl(
10401042
fun(LogLocation, Total) ->
1041-
Count = count_crashes_in_log(LogLocation),
1043+
Count = count_crashes_in_log(LogLocation, IgnoredCrashes),
10421044
Total + Count
10431045
end, 0, LogLocations).
10441046

1045-
count_crashes_in_log(LogLocation) ->
1047+
count_crashes_in_log(LogLocation, IgnoredCrashes) ->
10461048
case file:read_file(LogLocation) of
1047-
{ok, Content} -> count_crashes_in_content(Content);
1049+
{ok, Content} -> count_crashes_in_content(Content, IgnoredCrashes);
10481050
_ -> 0
10491051
end.
10501052

1051-
count_crashes_in_content(Content) ->
1053+
count_crashes_in_content(Content, IgnoredCrashes) ->
10521054
ReOpts = [multiline],
10531055
Lines = re:split(Content, "^", ReOpts),
1054-
count_gen_server_terminations(Lines).
1056+
count_gen_server_terminations(Lines, IgnoredCrashes).
10551057

1056-
count_gen_server_terminations(Lines) ->
1057-
count_gen_server_terminations(Lines, 0).
1058+
count_gen_server_terminations(Lines, IgnoredCrashes) ->
1059+
count_gen_server_terminations(Lines, 0, IgnoredCrashes).
10581060

1059-
count_gen_server_terminations([Line | Rest], Count) ->
1061+
count_gen_server_terminations([Line | Rest], Count, IgnoredCrashes) ->
10601062
ReOpts = [{capture, all_but_first, list}],
10611063
Ret = re:run(
10621064
Line,
10631065
"(<[0-9.]+> )[*]{2} Generic server .+ terminating$",
10641066
ReOpts),
10651067
case Ret of
10661068
{match, [Prefix]} ->
1067-
capture_gen_server_termination(Rest, Prefix, [Line], Count + 1);
1069+
capture_gen_server_termination(
1070+
Rest, Prefix, [Line], Count, IgnoredCrashes);
10681071
nomatch ->
1069-
count_gen_server_terminations(Rest, Count)
1072+
count_gen_server_terminations(Rest, Count, IgnoredCrashes)
10701073
end;
1071-
count_gen_server_terminations([], Count) ->
1074+
count_gen_server_terminations([], Count, _IgnoredCrashes) ->
10721075
Count.
10731076

1074-
capture_gen_server_termination([Line | Rest] = Lines, Prefix, Acc, Count) ->
1075-
ReOpts = [{capture, none}],
1076-
Ret = re:run(Line, Prefix ++ "( |\\*|$)", ReOpts),
1077+
capture_gen_server_termination(
1078+
[Line | Rest] = Lines, Prefix, Acc, Count, IgnoredCrashes) ->
1079+
ReOpts = [{capture, all_but_first, list}],
1080+
Ret = re:run(Line, Prefix ++ "( .*|\\*.*|)$", ReOpts),
10771081
case Ret of
1078-
match ->
1079-
capture_gen_server_termination(Rest, Prefix, [Line | Acc], Count);
1082+
{match, [Suffix]} ->
1083+
case lists:member(Suffix, IgnoredCrashes) of
1084+
false ->
1085+
capture_gen_server_termination(
1086+
Rest, Prefix, [Line | Acc], Count, IgnoredCrashes);
1087+
true ->
1088+
count_gen_server_terminations(
1089+
Lines, Count, IgnoredCrashes)
1090+
end;
10801091
nomatch ->
1081-
ct:pal("gen_server termination:~n~n~s", [lists:reverse(Acc)]),
1082-
count_gen_server_terminations(Lines, Count)
1092+
found_gen_server_termiation(
1093+
lists:reverse(Acc), Lines, Count, IgnoredCrashes)
10831094
end;
1084-
capture_gen_server_termination([] = Rest, _Prefix, Acc, Count) ->
1085-
ct:pal("gen_server termination:~n~n~s", [lists:reverse(Acc)]),
1086-
count_gen_server_terminations(Rest, Count).
1095+
capture_gen_server_termination(
1096+
[] = Rest, _Prefix, Acc, Count, IgnoredCrashes) ->
1097+
found_gen_server_termiation(
1098+
lists:reverse(Acc), Rest, Count, IgnoredCrashes).
1099+
1100+
found_gen_server_termiation(Message, Lines, Count, IgnoredCrashes) ->
1101+
ct:pal("gen_server termination:~n~n~s", [Message]),
1102+
count_gen_server_terminations(Lines, Count + 1, IgnoredCrashes).
10871103

10881104
%% -------------------------------------------------------------------
10891105
%% Helpers for partition simulation
@@ -1332,6 +1348,8 @@ delete_vhost(Config, Node, VHost) ->
13321348
delete_vhost(Config, Node, VHost, Username) ->
13331349
catch rpc(Config, Node, rabbit_vhost, delete, [VHost, Username]).
13341350

1351+
-define(FORCE_VHOST_FAILURE_REASON, force_vhost_failure).
1352+
13351353
force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
13361354

13371355
force_vhost_failure(Config, Node, VHost) ->
@@ -1345,7 +1363,8 @@ force_vhost_failure(Config, Node, VHost, Attempts) ->
13451363
try
13461364
MessageStorePid = get_message_store_pid(Config, Node, VHost),
13471365
rpc(Config, Node,
1348-
erlang, exit, [MessageStorePid, force_vhost_failure]),
1366+
erlang, exit,
1367+
[MessageStorePid, ?FORCE_VHOST_FAILURE_REASON]),
13491368
%% Give it a time to fail
13501369
timer:sleep(300),
13511370
force_vhost_failure(Config, Node, VHost, Attempts - 1)

0 commit comments

Comments
 (0)