@@ -185,13 +185,7 @@ mds_phase1_migration_enable(#{feature_name := FeatureName}) ->
185
185
true ->
186
186
ok ;
187
187
false ->
188
- ClusterNodes = rabbit_mnesia :cluster_nodes (all ),
189
- case rabbit_khepri :init_cluster (ClusterNodes ) of
190
- ok ->
191
- mds_migration_enable (FeatureName , Tables );
192
- {error , Reason } ->
193
- {error , {migration_failure , Reason }}
194
- end
188
+ mds_migration_enable (FeatureName , Tables )
195
189
end ,
196
190
global :del_lock ({FeatureName , self ()}),
197
191
Ret .
@@ -208,134 +202,27 @@ mds_phase1_migration_post_enable(#{feature_name := FeatureName}) ->
208
202
rabbit_db :set_migration_flag (FeatureName ).
209
203
210
204
mds_migration_enable (FeatureName , TablesAndOwners ) ->
211
- ok = ensure_khepri_cluster_matches_mnesia (FeatureName ),
212
- migrate_tables_to_khepri (FeatureName , TablesAndOwners ).
205
+ case ensure_khepri_cluster_matches_mnesia (FeatureName ) of
206
+ ok ->
207
+ migrate_tables_to_khepri (FeatureName , TablesAndOwners );
208
+ Error ->
209
+ {error , {migration_failure , Error }}
210
+ end .
213
211
214
212
mds_migration_post_enable (FeatureName , TablesAndOwners ) ->
215
213
? assert (rabbit_khepri :is_enabled (non_blocking )),
216
214
{Tables , _ } = lists :unzip (TablesAndOwners ),
217
215
empty_unused_mnesia_tables (FeatureName , Tables ).
218
216
219
217
ensure_khepri_cluster_matches_mnesia (FeatureName ) ->
220
- % % The ff controller has already ensure that all Mnesia nodes are running.
221
- ? LOG_DEBUG (
222
- " Feature flag `~s `: ensure Khepri Ra system is running" ,
223
- [FeatureName ]),
224
- ok = rabbit_khepri :setup (),
225
- AllMnesiaNodes = lists :sort (rabbit_mnesia :cluster_nodes (all )),
226
218
% % This is the first time Khepri will be used for real. Therefore
227
219
% % we need to make sure the Khepri cluster matches the Mnesia
228
220
% % cluster.
229
221
? LOG_DEBUG (
230
222
" Feature flag `~s `: updating the Khepri cluster to match "
231
223
" the Mnesia cluster" ,
232
224
[FeatureName ]),
233
- expand_khepri_cluster (FeatureName , AllMnesiaNodes ).
234
-
235
- expand_khepri_cluster (FeatureName , AllMnesiaNodes ) ->
236
- % % All Mnesia nodes are running (this is a requirement to enable this
237
- % % feature flag). We use this unique list of nodes to find the largest
238
- % % Khepri clusters among all of them.
239
- % %
240
- % % The idea is that at the beginning, each Mnesia node will also be an
241
- % % unclustered Khepri node. Therefore, the first node in the sorted list
242
- % % of Mnesia nodes will be picked (a "cluster" with 1 member, but the
243
- % % "largest" at the beginning).
244
- % %
245
- % % After the first nodes join that single node, its cluster will grow and
246
- % % will continue to be the largest.
247
- % %
248
- % % This function is executed on the node enabling the feature flag. It will
249
- % % take care of adding all nodes in the Mnesia cluster to a Khepri cluster
250
- % % (except those which are already part of it).
251
- % %
252
- % % This should avoid the situation where a large established cluster is
253
- % % reset and joins a single new/empty node.
254
- % %
255
- % % Also, we only consider Khepri clusters which are in use (i.e. the
256
- % % feature flag is enabled). Here is an example:
257
- % % - Node2 is the only node in the Mnesia cluster at the time the
258
- % % feature flag is enabled. It joins no other node and runs its own
259
- % % one-node Khepri cluster.
260
- % % - Node1 joins the Mnesia cluster which is now Node1 + Node2. Given
261
- % % the sorting, Khepri clusters will be [[Node1], [Node2]] when
262
- % % sorted by name and size. With this order, Node1 should "join"
263
- % % itself. But the feature is not enabled yet on this node,
264
- % % therefore, we skip this cluster to consider the following one,
265
- % % [Node2].
266
- KhepriCluster = find_largest_khepri_cluster (FeatureName ),
267
- NodesToAdd = AllMnesiaNodes -- KhepriCluster ,
268
- ? LOG_DEBUG (
269
- " Feature flags `~s `: selected Khepri cluster: ~p " ,
270
- [FeatureName , KhepriCluster ]),
271
- ? LOG_DEBUG (
272
- " Feature flags `~s `: Mnesia nodes to add to the Khepri cluster "
273
- " above: ~p " ,
274
- [FeatureName , NodesToAdd ]),
275
- add_nodes_to_khepri_cluster (FeatureName , KhepriCluster , NodesToAdd ).
276
-
277
- add_nodes_to_khepri_cluster (FeatureName , KhepriCluster , [Node | Rest ]) ->
278
- add_node_to_khepri_cluster (FeatureName , KhepriCluster , Node ),
279
- add_nodes_to_khepri_cluster (FeatureName , KhepriCluster , Rest );
280
- add_nodes_to_khepri_cluster (_FeatureName , _KhepriCluster , []) ->
281
- ok .
282
-
283
- add_node_to_khepri_cluster (FeatureName , KhepriCluster , Node ) ->
284
- ? assertNotEqual ([], KhepriCluster ),
285
- case lists :member (Node , KhepriCluster ) of
286
- true ->
287
- ? LOG_DEBUG (
288
- " Feature flag `~s `: node ~p is already a member of "
289
- " the largest cluster: ~p " ,
290
- [FeatureName , Node , KhepriCluster ]),
291
- ok ;
292
- false ->
293
- ? LOG_DEBUG (
294
- " Feature flag `~s `: adding node ~p to the largest "
295
- " Khepri cluster found among Mnesia nodes: ~p " ,
296
- [FeatureName , Node , KhepriCluster ]),
297
- case rabbit_khepri :add_member (Node , KhepriCluster ) of
298
- ok -> ok ;
299
- {ok , already_member } -> ok
300
- end
301
- end .
302
-
303
- find_largest_khepri_cluster (FeatureName ) ->
304
- case list_all_khepri_clusters (FeatureName ) of
305
- [] ->
306
- [node ()];
307
- KhepriClusters ->
308
- KhepriClustersBySize = sort_khepri_clusters_by_size (
309
- KhepriClusters ),
310
- ? LOG_DEBUG (
311
- " Feature flag `~s `: existing Khepri clusters (sorted by "
312
- " size): ~p " ,
313
- [FeatureName , KhepriClustersBySize ]),
314
- LargestKhepriCluster = hd (KhepriClustersBySize ),
315
- LargestKhepriCluster
316
- end .
317
-
318
- list_all_khepri_clusters (FeatureName ) ->
319
- MnesiaNodes = lists :sort (rabbit_mnesia :cluster_nodes (all )),
320
- ? LOG_DEBUG (
321
- " Feature flag `~s `: querying the following Mnesia nodes to learn "
322
- " their Khepri cluster membership: ~p " ,
323
- [FeatureName , MnesiaNodes ]),
324
- KhepriClusters = lists :foldl (
325
- fun (MnesiaNode , Acc ) ->
326
- case khepri_cluster_on_node (MnesiaNode ) of
327
- [] -> Acc ;
328
- Cluster -> Acc #{Cluster => true }
329
- end
330
- end , #{}, MnesiaNodes ),
331
- lists :sort (maps :keys (KhepriClusters )).
332
-
333
- sort_khepri_clusters_by_size (KhepriCluster ) ->
334
- lists :sort (
335
- fun ([A ], B ) when A == node () ->
336
- 1 > length (B );
337
- (A , B ) -> length (A ) >= length (B ) end ,
338
- KhepriCluster ).
225
+ rabbit_khepri :init_cluster ().
339
226
340
227
khepri_cluster_on_node (Node ) ->
341
228
lists :sort (
0 commit comments