Skip to content

Commit 0f152cb

Browse files
committed
Khepri: clustering
1 parent 2a7d355 commit 0f152cb

13 files changed

+931
-156
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -356,19 +356,20 @@ run_prelaunch_second_phase() ->
356356
%% 3. Logging.
357357
ok = rabbit_prelaunch_logging:setup(Context),
358358

359+
%% The clustering steps requires Khepri to be started to check for consistency
360+
ok = rabbit_ra_systems:setup(Context),
361+
362+
%% Khepri requires the "coordination" Ra system to be started by the
363+
%% previous call, but will ensure it runs anyway.
364+
ok = rabbit_khepri:setup(Context),
365+
359366
%% 4. Clustering.
360367
ok = rabbit_prelaunch_cluster:setup(Context),
361368

362369
%% Start Mnesia now that everything is ready.
363370
?LOG_DEBUG("Starting Mnesia"),
364371
ok = mnesia:start(),
365372

366-
ok = rabbit_ra_systems:setup(Context),
367-
368-
%% Khepri requires the "coordination" Ra system to be started by the
369-
%% previous call, but will ensure it runs anyway.
370-
ok = rabbit_khepri:setup(Context),
371-
372373
?LOG_DEBUG(""),
373374
?LOG_DEBUG("== Prelaunch DONE =="),
374375

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,6 +1720,9 @@ forget_all_durable(Node) ->
17201720
forget_node_for_queue(_DeadNode, Q)
17211721
when ?amqqueue_is_quorum(Q) ->
17221722
ok;
1723+
forget_node_for_queue(_DeadNode, Q)
1724+
when ?amqqueue_is_stream(Q) ->
1725+
ok;
17231726
forget_node_for_queue(DeadNode, Q) ->
17241727
RS = amqqueue:get_recoverable_slaves(Q),
17251728
forget_node_for_queue(DeadNode, RS, Q).

deps/rabbit/src/rabbit_channel_tracking_handler.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
5151
handle_event(#event{type = user_deleted, props = Details}, State) ->
5252
ok = rabbit_channel_tracking:update_tracked({user_deleted, Details}),
5353
{ok, State};
54-
%% A node had been deleted from the cluster.
55-
handle_event(#event{type = node_deleted, props = Details}, State) ->
56-
ok = rabbit_channel_tracking:update_tracked({node_deleted, Details}),
57-
{ok, State};
5854
handle_event(_Event, State) ->
5955
{ok, State}.
6056

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,16 @@ mds_phase1_migration_enable(#{feature_name := FeatureName}) ->
173173
Tables = ?MDS_PHASE1_TABLES,
174174
global:set_lock({FeatureName, self()}),
175175
Ret = case rabbit_khepri:is_ready() of
176-
true -> ok;
177-
false -> mds_migration_enable(FeatureName, Tables)
176+
true ->
177+
ok;
178+
false ->
179+
ClusterNodes = rabbit_mnesia:cluster_nodes(all),
180+
case rabbit_khepri:init_cluster(ClusterNodes) of
181+
ok ->
182+
mds_migration_enable(FeatureName, Tables);
183+
{error, Reason} ->
184+
{error, {migration_failure, Reason}}
185+
end
178186
end,
179187
global:del_lock({FeatureName, self()}),
180188
Ret.

deps/rabbit/src/rabbit_db.erl

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@
4646
init() ->
4747
rabbit_db:run(
4848
#{mnesia => fun() -> init_in_mnesia() end,
49-
khepri => fun() -> init_in_khepri() end
49+
khepri => fun() ->
50+
init_in_khepri(),
51+
init_in_mnesia()
52+
end
5053
}).
5154

5255
init_in_mnesia() ->
@@ -82,27 +85,53 @@ init_using_mnesia() ->
8285
%% @doc Resets the database and the node.
8386

8487
reset() ->
85-
reset_using_mnesia().
88+
rabbit_log:info("Resetting Rabbit", []),
89+
run(
90+
#{mnesia => fun() -> reset_using_mnesia() end,
91+
khepri => fun() ->
92+
case reset_using_khepri() of
93+
ok -> reset_using_mnesia();
94+
Error -> Error
95+
end
96+
end
97+
}).
8698

8799
reset_using_mnesia() ->
88100
?LOG_DEBUG(
89101
"DB: resetting node",
90102
#{domain => ?RMQLOG_DOMAIN_DB}),
91103
rabbit_mnesia:reset().
92104

105+
reset_using_khepri() ->
106+
?LOG_DEBUG(
107+
"DB: resetting node",
108+
#{domain => ?RMQLOG_DOMAIN_DB}),
109+
rabbit_khepri:reset().
110+
93111
-spec force_reset() -> Ret when
94112
Ret :: ok.
95113
%% @doc Resets the database and the node.
96114

97115
force_reset() ->
98-
force_reset_using_mnesia().
116+
?LOG_DEBUG(
117+
"DB: resetting node forcefully",
118+
#{domain => ?RMQLOG_DOMAIN_DB}),
119+
run(
120+
#{mnesia => fun() -> force_reset_using_mnesia() end,
121+
khepri => fun() ->
122+
case force_reset_using_khepri() of
123+
ok -> force_reset_using_mnesia();
124+
Error -> Error
125+
end
126+
end
127+
}).
99128

100129
force_reset_using_mnesia() ->
101-
?LOG_DEBUG(
102-
"DB: resetting node forcefully",
103-
#{domain => ?RMQLOG_DOMAIN_DB}),
104130
rabbit_mnesia:force_reset().
105131

132+
force_reset_using_khepri() ->
133+
rabbit_khepri:force_reset().
134+
106135
-spec force_load_on_next_boot() -> Ret when
107136
Ret :: ok.
108137
%% @doc Requests that the database to be forcefully loaded during next boot.
@@ -111,7 +140,16 @@ force_reset_using_mnesia() ->
111140
%% state, like if critical members are MIA.
112141

113142
force_load_on_next_boot() ->
114-
force_load_on_next_boot_using_mnesia().
143+
run(
144+
#{mnesia => fun() -> force_load_on_next_boot_using_mnesia() end,
145+
khepri => fun() ->
146+
%% TODO force load using Khepri might need to be implemented
147+
%% for disaster recovery scenarios where just a minority of
148+
%% nodes are accessible. Potentially, it could also be replaced
149+
%% with a way to export all the data.
150+
force_load_on_next_boot_using_mnesia()
151+
end
152+
}).
115153

116154
force_load_on_next_boot_using_mnesia() ->
117155
?LOG_DEBUG(

deps/rabbit/src/rabbit_db_cluster.erl

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,23 +47,68 @@ join(RemoteNode, NodeType)
4747
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
4848
?LOG_DEBUG(
4949
"DB: joining cluster using remote node `~ts`", [RemoteNode],
50-
#{domain => ?RMQLOG_DOMAIN_DB}),
51-
join_using_mnesia(RemoteNode, NodeType).
50+
#{domain => ?RMQLOG_DOMAIN_DB}),
51+
case rabbit_db:run(
52+
#{mnesia => fun() -> join_using_mnesia(RemoteNode, NodeType) end,
53+
khepri => fun() -> join_using_khepri(RemoteNode, NodeType) end
54+
}) of
55+
ok ->
56+
rabbit_node_monitor:notify_joined_cluster(),
57+
ok;
58+
Other ->
59+
Other
60+
end.
5261

5362
join_using_mnesia(RemoteNode, NodeType) ->
5463
rabbit_mnesia:join_cluster(RemoteNode, NodeType).
5564

65+
join_using_khepri(_RemoteNode, ram) ->
66+
rabbit_log:warning("Join node with --ram flag is not supported by Khepri. Skipping..."),
67+
{error, not_supported};
68+
join_using_khepri(RemoteNode, NodeType) ->
69+
case rabbit_khepri:check_join_cluster(RemoteNode) of
70+
ok ->
71+
case join_using_mnesia(RemoteNode, NodeType) of
72+
ok ->
73+
rabbit_khepri:join_cluster(RemoteNode);
74+
{ok, already_member} ->
75+
rabbit_khepri:join_cluster(RemoteNode);
76+
Error ->
77+
Error
78+
end;
79+
{ok, already_member} ->
80+
join_using_mnesia(RemoteNode, NodeType);
81+
Error ->
82+
Error
83+
end.
84+
5685
-spec forget_member(Node, RemoveWhenOffline) -> ok when
5786
Node :: node(),
5887
RemoveWhenOffline :: boolean().
5988
%% @doc Removes `Node' from the cluster.
6089

6190
forget_member(Node, RemoveWhenOffline) ->
62-
forget_member_using_mnesia(Node, RemoveWhenOffline).
91+
rabbit_db:run(
92+
#{mnesia => fun() -> forget_member_using_mnesia(Node, RemoveWhenOffline) end,
93+
khepri => fun() ->
94+
case forget_member_using_khepri(Node, RemoveWhenOffline) of
95+
ok ->
96+
forget_member_using_mnesia(Node, RemoveWhenOffline);
97+
Error ->
98+
Error
99+
end
100+
end
101+
}).
63102

64103
forget_member_using_mnesia(Node, RemoveWhenOffline) ->
65104
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).
66105

106+
forget_member_using_khepri(_Node, true) ->
107+
rabbit_log:warning("Remove node with --offline flag is not supported by Khepri. Skipping..."),
108+
{error, not_supported};
109+
forget_member_using_khepri(Node, false = _RemoveWhenOffline) ->
110+
rabbit_khepri:leave_cluster(Node).
111+
67112
%% -------------------------------------------------------------------
68113
%% Cluster update.
69114
%% -------------------------------------------------------------------
@@ -75,11 +120,18 @@ forget_member_using_mnesia(Node, RemoveWhenOffline) ->
75120
%% Node types may not all be valid with all databases.
76121

77122
change_node_type(NodeType) ->
78-
change_node_type_using_mnesia(NodeType).
123+
rabbit_db:run(
124+
#{mnesia => fun() -> change_node_type_using_mnesia(NodeType) end,
125+
khepri => fun() -> change_node_type_using_khepri(NodeType) end
126+
}).
79127

80128
change_node_type_using_mnesia(NodeType) ->
81129
rabbit_mnesia:change_cluster_node_type(NodeType).
82130

131+
change_node_type_using_khepri(_NodeType) ->
132+
rabbit_log:warning("Change cluster node type is not supported by Khepri. Only disc nodes are allowed. Skipping..."),
133+
{error, not_supported}.
134+
83135
%% -------------------------------------------------------------------
84136
%% Cluster status.
85137
%% -------------------------------------------------------------------
@@ -89,17 +141,26 @@ change_node_type_using_mnesia(NodeType) ->
89141
%% @doc Indicates if this node is clustered with other nodes or not.
90142

91143
is_clustered() ->
92-
is_clustered_using_mnesia().
144+
rabbit_db:run(
145+
#{mnesia => fun() -> is_clustered_using_mnesia() end,
146+
khepri => fun() -> is_clustered_using_khepri() end
147+
}).
93148

94149
is_clustered_using_mnesia() ->
95150
rabbit_mnesia:is_clustered().
96151

152+
is_clustered_using_khepri() ->
153+
rabbit_khepri:is_clustered().
154+
97155
-spec members() -> Members when
98156
Members :: [node()].
99157
%% @doc Returns the list of cluster members.
100158

101159
members() ->
102-
members_using_mnesia().
160+
rabbit_db:run(
161+
#{mnesia => fun() -> members_using_mnesia() end,
162+
khepri => fun() -> members_using_khepri() end
163+
}).
103164

104165
members_using_mnesia() ->
105166
case rabbit_mnesia:is_running() andalso rabbit_table:is_present() of
@@ -122,12 +183,18 @@ members_using_mnesia() ->
122183
end
123184
end.
124185

186+
members_using_khepri() ->
187+
rabbit_khepri:nodes().
188+
125189
-spec disc_members() -> Members when
126190
Members :: [node()].
127191
%% @private
128192

129193
disc_members() ->
130-
disc_members_using_mnesia().
194+
rabbit_db:run(
195+
#{mnesia => fun() -> disc_members_using_mnesia() end,
196+
khepri => fun() -> members_using_khepri() end
197+
}).
131198

132199
disc_members_using_mnesia() ->
133200
rabbit_mnesia:cluster_nodes(disc).
@@ -139,20 +206,36 @@ disc_members_using_mnesia() ->
139206
%% Node types may not all be relevant with all databases.
140207

141208
node_type() ->
142-
node_type_using_mnesia().
209+
rabbit_db:run(
210+
#{mnesia => fun() -> node_type_using_mnesia() end,
211+
khepri => fun() -> node_type_using_khepri() end
212+
}).
143213

144214
node_type_using_mnesia() ->
145215
rabbit_mnesia:node_type().
146216

217+
node_type_using_khepri() ->
218+
disc.
219+
147220
-spec check_consistency() -> ok.
148221
%% @doc Ensures the cluster is consistent.
149222

150223
check_consistency() ->
151-
check_consistency_using_mnesia().
224+
rabbit_db:run(
225+
#{mnesia => fun() -> check_consistency_using_mnesia() end,
226+
khepri => fun() -> case check_consistency_using_khepri() of
227+
ok -> check_consistency_using_mnesia();
228+
Error -> Error
229+
end
230+
end
231+
}).
152232

153233
check_consistency_using_mnesia() ->
154234
rabbit_mnesia:check_cluster_consistency().
155235

236+
check_consistency_using_khepri() ->
237+
rabbit_khepri:check_cluster_consistency().
238+
156239
-spec cli_cluster_status() -> ClusterStatus when
157240
ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} |
158241
{running_nodes, [node()]} |

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,17 @@ delete_in_mnesia(QueueName, Reason) ->
358358
end).
359359

360360
delete_in_khepri(QueueName) ->
361+
delete_in_khepri(QueueName, false).
362+
363+
delete_in_khepri(QueueName, OnlyDurable) ->
361364
rabbit_khepri:transaction(
362365
fun () ->
363366
Path = khepri_queue_path(QueueName),
364367
case khepri_tx_adv:delete(Path) of
365368
{ok, #{data := _}} ->
366369
%% we want to execute some things, as decided by rabbit_exchange,
367370
%% after the transaction.
368-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, false);
371+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
369372
{ok, _} ->
370373
ok
371374
end
@@ -387,7 +390,7 @@ internal_delete(QueueName, OnlyDurable, Reason) ->
387390
%% HA queues are removed it can be removed.
388391
rabbit_db:run(
389392
#{mnesia => fun() -> internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) end,
390-
khepri => fun() -> ok end
393+
khepri => fun() -> delete_in_khepri(QueueName, OnlyDurable) end
391394
}).
392395

393396
internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
@@ -970,7 +973,7 @@ foreach_transient_in_mnesia(UpdateFun) ->
970973
foreach_durable(UpdateFun, FilterFun) ->
971974
rabbit_db:run(
972975
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
973-
khepri => fun() -> ok end
976+
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
974977
}).
975978

976979
foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
@@ -984,7 +987,15 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
984987
_ = [UpdateFun(Q) || Q <- Qs, FilterFun(Q)],
985988
ok
986989
end),
987-
ok.
990+
ok.
991+
992+
foreach_durable_in_khepri(UpdateFun, FilterFun) ->
993+
Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()],
994+
{ok, Qs} = rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
995+
FilterFun(Q)
996+
end),
997+
_ = [UpdateFun(Q) || Q <- maps:values(Qs)],
998+
ok.
988999

9891000
%% -------------------------------------------------------------------
9901001
%% set_dirty().

0 commit comments

Comments
 (0)