Skip to content

Commit 78c64c9

Browse files
committed
Khepri data migration using khepri_mnesia_migration
1 parent 0998b15 commit 78c64c9

26 files changed

+1554
-844
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-record(mirrored_sup_childspec, {key, mirroring_pid, childspec}).

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 58 additions & 214 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 10 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
match_source_and_destination_in_khepri_tx/2
3636
]).
3737

38-
-export([mnesia_write_to_khepri/2,
39-
mnesia_delete_to_khepri/2,
40-
clear_data_in_khepri/1]).
38+
-export([
39+
khepri_route_path/1,
40+
khepri_routes_path/0,
41+
khepri_route_exchange_path/1
42+
]).
4143

4244
%% Recovery is only needed for transient entities. Once mnesia is removed, these
4345
%% functions can be deleted
@@ -901,91 +903,6 @@ match_source_and_destination_in_khepri_tx(#resource{virtual_host = VHost, name =
901903
_ -> []
902904
end.
903905

904-
%% --------------------------------------------------------------
905-
%% Migration
906-
%% --------------------------------------------------------------
907-
908-
-spec mnesia_write_to_khepri(Table, [Route]) -> ok when
909-
Table :: atom(),
910-
Route :: #route{}.
911-
mnesia_write_to_khepri(rabbit_route, Routes)->
912-
rabbit_khepri:transaction(
913-
fun() ->
914-
_ = lists:foldl(
915-
fun(Route, Xs0) ->
916-
#route{binding = #binding{source = XName} = Binding} = Route,
917-
Path = khepri_route_path(Binding),
918-
Xs =
919-
case sets:is_element(XName, Xs0) of
920-
true ->
921-
Xs0;
922-
false ->
923-
%% If the binding's source is a new exchange,
924-
%% store the exchange's type in the exchange
925-
%% name branch of the tree.
926-
XPath = khepri_route_exchange_path(XName),
927-
[#exchange{type = XType}] =
928-
rabbit_db_exchange:get_in_khepri_tx(XName),
929-
ok = khepri_tx:put(XPath, #{type => XType}),
930-
sets:add_element(XName, Xs0)
931-
end,
932-
add_binding_tx(Path, Binding),
933-
Xs
934-
end, sets:new([{version, 2}]), Routes),
935-
ok
936-
end, rw);
937-
mnesia_write_to_khepri(rabbit_durable_route, _)->
938-
ok;
939-
mnesia_write_to_khepri(rabbit_semi_durable_route, _)->
940-
ok;
941-
mnesia_write_to_khepri(rabbit_reverse_route, _) ->
942-
ok.
943-
944-
add_binding_tx(Path, Binding) ->
945-
Set = case khepri_tx:get(Path) of
946-
{ok, Set0} ->
947-
Set0;
948-
_ ->
949-
sets:new()
950-
end,
951-
ok = khepri_tx:put(Path, sets:add_element(Binding, Set)).
952-
953-
-spec mnesia_delete_to_khepri(Table, ToDelete) -> ok when
954-
Table :: atom(),
955-
ToDelete :: #route{} | rabbit_types:binding().
956-
957-
mnesia_delete_to_khepri(rabbit_route, Route) when is_record(Route, route) ->
958-
khepri_delete(khepri_route_path(Route#route.binding));
959-
mnesia_delete_to_khepri(rabbit_route, Name) ->
960-
khepri_delete(khepri_route_path(Name));
961-
mnesia_delete_to_khepri(rabbit_durable_route, Route) when is_record(Route, route) ->
962-
khepri_delete(khepri_route_path(Route#route.binding));
963-
mnesia_delete_to_khepri(rabbit_durable_route, Name) ->
964-
khepri_delete(khepri_route_path(Name));
965-
mnesia_delete_to_khepri(rabbit_semi_durable_route, Route) when is_record(Route, route) ->
966-
khepri_delete(khepri_route_path(Route#route.binding));
967-
mnesia_delete_to_khepri(rabbit_semi_durable_route, Name) ->
968-
khepri_delete(khepri_route_path(Name)).
969-
970-
-spec clear_data_in_khepri(Table) -> ok when
971-
Table :: atom().
972-
973-
clear_data_in_khepri(rabbit_route) ->
974-
khepri_delete(khepri_routes_path());
975-
%% There is a single khepri entry for routes and it should be already deleted
976-
clear_data_in_khepri(rabbit_durable_route) ->
977-
ok;
978-
clear_data_in_khepri(rabbit_semi_durable_route) ->
979-
ok;
980-
clear_data_in_khepri(rabbit_reverse_route) ->
981-
ok.
982-
983-
khepri_delete(Path) ->
984-
case rabbit_khepri:delete(Path) of
985-
ok -> ok;
986-
Error -> throw(Error)
987-
end.
988-
989906
%% -------------------------------------------------------------------
990907
%% clear().
991908
%% -------------------------------------------------------------------
@@ -1009,8 +926,11 @@ clear_in_mnesia() ->
1009926
ok.
1010927

1011928
clear_in_khepri() ->
1012-
khepri_delete(khepri_routes_path()),
1013-
ok.
929+
Path = rabbit_db_binding:khepri_routes_path(),
930+
case rabbit_khepri:delete(Path) of
931+
ok -> ok;
932+
Error -> throw(Error)
933+
end.
1014934

1015935
%% --------------------------------------------------------------
1016936
%% Paths
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright © 2022-2023 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_db_binding_m2k_converter).
9+
10+
-behaviour(mnesia_to_khepri_converter).
11+
12+
-include_lib("kernel/include/logger.hrl").
13+
-include_lib("khepri/include/khepri.hrl").
14+
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
15+
-include_lib("rabbit_common/include/rabbit.hrl").
16+
17+
-export([init_copy_to_khepri/2,
18+
copy_to_khepri/3,
19+
delete_from_khepri/3,
20+
clear_data_in_khepri/1]).
21+
22+
-record(?MODULE, {store_id :: khepri:store_id(),
23+
exchanges :: sets:set()}).
24+
25+
-spec init_copy_to_khepri(Tables, StoreId) -> Ret when
26+
Tables :: [mnesia_to_khepri:mnesia_table()],
27+
StoreId :: khepri:store_id(),
28+
Ret :: {ok, Priv},
29+
Priv :: #?MODULE{}.
30+
%% @private
31+
32+
init_copy_to_khepri(_Tables, StoreId) ->
33+
State = #?MODULE{store_id = StoreId,
34+
exchanges = sets:new([{version, 2}])},
35+
{ok, State}.
36+
37+
-spec copy_to_khepri(Table, Record, Priv) -> Ret when
38+
Table :: mnesia_to_khepri:mnesia_table(),
39+
Record :: tuple(),
40+
Priv :: #?MODULE{},
41+
Ret :: {ok, NewPriv} | {error, Reason},
42+
NewPriv :: #?MODULE{},
43+
Reason :: any().
44+
%% @private
45+
46+
copy_to_khepri(rabbit_route = Table,
47+
#route{binding = #binding{source = XName} = Binding},
48+
#?MODULE{exchanges = Xs0} = State) ->
49+
?LOG_DEBUG(
50+
"Mnesia->Khepri data copy: [~0p] key: ~0p",
51+
[Table, Binding],
52+
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
53+
Path = rabbit_db_binding:khepri_route_path(Binding),
54+
?LOG_DEBUG(
55+
"Mnesia->Khepri data copy: [~0p] path: ~0p",
56+
[Table, Path],
57+
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
58+
rabbit_khepri:transaction(
59+
fun() ->
60+
Xs = case sets:is_element(XName, Xs0) of
61+
true ->
62+
Xs0;
63+
false ->
64+
%% If the binding's source is a new exchange,
65+
%% store the exchange's type in the exchange
66+
%% name branch of the tree.
67+
XPath = rabbit_db_binding:khepri_route_exchange_path(XName),
68+
[#exchange{type = XType}] =
69+
rabbit_db_exchange:get_in_khepri_tx(XName),
70+
ok = khepri_tx:put(XPath, #{type => XType}),
71+
sets:add_element(XName, Xs0)
72+
end,
73+
Set = case khepri_tx:get(Path) of
74+
{ok, Set0} ->
75+
Set0;
76+
_ ->
77+
sets:new()
78+
end,
79+
case khepri_tx:put(Path, sets:add_element(Binding, Set)) of
80+
ok -> {ok, State#?MODULE{exchanges = Xs}};
81+
Error -> Error
82+
end
83+
end, rw);
84+
copy_to_khepri(Table, Record, State) ->
85+
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
86+
[Table, Record, State]),
87+
{error, unexpected_record}.
88+
89+
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
90+
Table :: mnesia_to_khepri:mnesia_table(),
91+
Key :: any(),
92+
Priv :: #?MODULE{},
93+
Ret :: {ok, NewPriv} | {error, Reason},
94+
NewPriv :: #?MODULE{},
95+
Reason :: any().
96+
%% @private
97+
98+
delete_from_khepri(rabbit_route = Table, Key,
99+
#?MODULE{store_id = StoreId} = State) ->
100+
?LOG_DEBUG(
101+
"Mnesia->Khepri data delete: [~0p] key: ~0p",
102+
[Table, Key],
103+
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
104+
Path = rabbit_db_binding:khepri_route_path(Key),
105+
?LOG_DEBUG(
106+
"Mnesia->Khepri data delete: [~0p] path: ~0p",
107+
[Table, Path],
108+
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
109+
case khepri:delete(StoreId, Path) of
110+
ok -> {ok, State};
111+
Error -> Error
112+
end.
113+
114+
-spec clear_data_in_khepri(Table) -> ok when
115+
Table :: atom().
116+
117+
clear_data_in_khepri(rabbit_route) ->
118+
Path = rabbit_db_binding:khepri_routes_path(),
119+
case rabbit_khepri:delete(Path) of
120+
ok -> ok;
121+
Error -> throw(Error)
122+
end.

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 13 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,16 @@
4444
path/1
4545
]).
4646

47-
-export([mnesia_write_to_khepri/2,
48-
mnesia_delete_to_khepri/2,
49-
clear_data_in_khepri/1]).
50-
5147
%% For testing
5248
-export([clear/0]).
5349

50+
-export([
51+
khepri_exchange_path/1,
52+
khepri_exchange_serial_path/1,
53+
khepri_exchanges_path/0,
54+
khepri_exchange_serials_path/0
55+
]).
56+
5457
-define(MNESIA_TABLE, rabbit_exchange).
5558
-define(MNESIA_DURABLE_TABLE, rabbit_durable_exchange).
5659
-define(MNESIA_SERIAL_TABLE, rabbit_exchange_serial).
@@ -772,87 +775,6 @@ exists_in_mnesia(Name) ->
772775
exists_in_khepri(Name) ->
773776
rabbit_khepri:exists(khepri_exchange_path(Name)).
774777

775-
%% --------------------------------------------------------------
776-
%% Migration
777-
%% --------------------------------------------------------------
778-
779-
-spec mnesia_write_to_khepri(Table, [Queue]) -> ok when
780-
Table :: atom(),
781-
Queue :: amqqueue:amqqueue().
782-
%% Mnesia contains two tables if an exchange has been recovered:
783-
%% rabbit_exchange (ram) and rabbit_durable_exchange (disc).
784-
%% As all data in Khepri is persistent, there is no point on
785-
%% having ram and data entries.
786-
%% How do we then transform data from mnesia to khepri when
787-
%% the feature flag is enabled?
788-
%% Let's create the Khepri entry from the ram table.
789-
790-
mnesia_write_to_khepri(rabbit_exchange, Exchanges) ->
791-
rabbit_khepri:transaction(
792-
fun() ->
793-
[khepri_create_tx(khepri_exchange_path(Exchange#exchange.name), Exchange)
794-
|| Exchange <- Exchanges]
795-
end, rw);
796-
mnesia_write_to_khepri(rabbit_durable_exchange, _Exchange0) ->
797-
ok;
798-
mnesia_write_to_khepri(rabbit_exchange_serial, Exchanges) ->
799-
rabbit_khepri:transaction(
800-
fun() ->
801-
[begin
802-
#exchange_serial{name = Resource, next = Serial} = Exchange,
803-
Path = khepri_path:combine_with_conditions(
804-
khepri_exchange_serial_path(Resource),
805-
[#if_node_exists{exists = false}]),
806-
case khepri_tx:put(Path, Serial) of
807-
ok -> ok;
808-
Error -> throw(Error)
809-
end
810-
end || Exchange <- Exchanges]
811-
end, rw).
812-
813-
khepri_create_tx(Path, Value) ->
814-
case khepri_tx:create(Path, Value) of
815-
ok -> ok;
816-
{error, {khepri, mismatching_node, _}} -> ok;
817-
Error -> throw(Error)
818-
end.
819-
820-
-spec mnesia_delete_to_khepri(Table, ToDelete) -> ok when
821-
Table :: atom(),
822-
ToDelete :: amqqueue:amqqueue() | rabbit_amqqueue:name().
823-
824-
mnesia_delete_to_khepri(rabbit_exchange, Exchange) when is_record(Exchange, exchange) ->
825-
khepri_delete(khepri_exchange_path(Exchange#exchange.name));
826-
mnesia_delete_to_khepri(rabbit_exchange, Name) ->
827-
khepri_delete(khepri_exchange_path(Name));
828-
mnesia_delete_to_khepri(rabbit_durable_exchange, Exchange)
829-
when is_record(Exchange, exchange) ->
830-
khepri_delete(khepri_exchange_path(Exchange#exchange.name));
831-
mnesia_delete_to_khepri(rabbit_durable_exchange, Name) ->
832-
khepri_delete(khepri_exchange_path(Name));
833-
mnesia_delete_to_khepri(rabbit_exchange_serial, ExchangeSerial)
834-
when is_record(ExchangeSerial, exchange_serial) ->
835-
khepri_delete(khepri_exchange_serial_path(ExchangeSerial#exchange_serial.name));
836-
mnesia_delete_to_khepri(rabbit_exchange_serial, Name) ->
837-
khepri_delete(khepri_exchange_serial_path(Name)).
838-
839-
-spec clear_data_in_khepri(Table) -> ok when
840-
Table :: atom().
841-
842-
clear_data_in_khepri(rabbit_exchange) ->
843-
khepri_delete(khepri_exchanges_path());
844-
%% There is a single khepri entry for exchanges and it should be already deleted
845-
clear_data_in_khepri(rabbit_durable_exchange) ->
846-
ok;
847-
clear_data_in_khepri(rabbit_exchange_serial) ->
848-
khepri_delete(khepri_exchange_serials_path()).
849-
850-
khepri_delete(Path) ->
851-
case rabbit_khepri:delete(Path) of
852-
ok -> ok;
853-
Error -> throw(Error)
854-
end.
855-
856778
%% -------------------------------------------------------------------
857779
%% clear().
858780
%% -------------------------------------------------------------------
@@ -878,6 +800,12 @@ clear_in_khepri() ->
878800
khepri_delete(khepri_exchanges_path()),
879801
khepri_delete(khepri_exchange_serials_path()).
880802

803+
khepri_delete(Path) ->
804+
case rabbit_khepri:delete(Path) of
805+
ok -> ok;
806+
Error -> throw(Error)
807+
end.
808+
881809
%% -------------------------------------------------------------------
882810
%% maybe_auto_delete_in_mnesia().
883811
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)