Skip to content

Commit 8c35c8e

Browse files
committed
Khepri: handle pairs Table-Key received during metadata store migration
These are generated by Mnesia when there are writes in-flight, but do not contain enough info to copy the record. An event with Table-Record is generated at the same time, which is properly processed after the initial copy of data.
1 parent 7e95c16 commit 8c35c8e

10 files changed

+23
-15
lines changed

deps/rabbit/app.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ def all_srcs(name = "all_srcs"):
539539
"include/amqqueue.hrl",
540540
"include/amqqueue_v2.hrl",
541541
"include/gm_specs.hrl",
542+
"include/internal_user.hrl",
542543
"include/mc.hrl",
543544
"include/rabbit_global_counters.hrl",
544545
"include/vhost.hrl",

deps/rabbit/include/internal_user.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-define(is_internal_user(U),
2+
(?is_internal_user_v2(U))).
3+
4+
-define(is_internal_user_v2(U), is_record(U, internal_user, 6)).

deps/rabbit/src/rabbit_db_binding_m2k_converter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ copy_to_khepri(rabbit_route = Table,
8787
copy_to_khepri(Table, Record, State) ->
8888
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
8989
[Table, Record, State]),
90-
{error, unexpected_record}.
90+
{ok, State}.
9191

9292
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
9393
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_exchange_m2k_converter.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ init_copy_to_khepri(StoreId, _MigrationId, Tables) ->
4545
%% @private
4646

4747
copy_to_khepri(
48-
rabbit_exchange = Table, Record,
48+
rabbit_exchange = Table, #exchange{} = Record,
4949
#?MODULE{store_id = StoreId} = State) ->
5050
Name = Record#exchange.name,
5151
?LOG_DEBUG(
@@ -82,7 +82,7 @@ copy_to_khepri(rabbit_exchange_serial = Table,
8282
copy_to_khepri(Table, Record, State) ->
8383
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
8484
[Table, Record, State]),
85-
{error, unexpected_record}.
85+
{ok, State}.
8686

8787
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
8888
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_maintenance_m2k_converter.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ init_copy_to_khepri(StoreId, _MigrationId, Tables) ->
4545
%% @private
4646

4747
copy_to_khepri(
48-
rabbit_node_maintenance_states = Table, Record,
48+
rabbit_node_maintenance_states = Table, #node_maintenance_state{} = Record,
4949
#?MODULE{store_id = StoreId} = State) ->
5050
Name = Record#node_maintenance_state.node,
5151
?LOG_DEBUG(
@@ -64,7 +64,7 @@ copy_to_khepri(
6464
copy_to_khepri(Table, Record, State) ->
6565
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
6666
[Table, Record, State]),
67-
{error, unexpected_record}.
67+
{ok, State}.
6868

6969
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
7070
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_msup_m2k_converter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ copy_to_khepri(mirrored_sup_childspec = Table,
6464
copy_to_khepri(Table, Record, State) ->
6565
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
6666
[Table, Record, State]),
67-
{error, unexpected_record}.
67+
{ok, State}.
6868

6969
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
7070
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_queue_m2k_converter.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-include_lib("khepri/include/khepri.hrl").
1414
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
1515
-include_lib("rabbit_common/include/rabbit.hrl").
16+
-include("amqqueue.hrl").
1617

1718
-export([init_copy_to_khepri/3,
1819
copy_to_khepri/3,
@@ -45,7 +46,7 @@ init_copy_to_khepri(StoreId, _MigrationId, Tables) ->
4546
%% @private
4647

4748
copy_to_khepri(rabbit_queue = Table, Record,
48-
#?MODULE{store_id = StoreId} = State) ->
49+
#?MODULE{store_id = StoreId} = State) when ?is_amqqueue(Record) ->
4950
Name = amqqueue:get_name(Record),
5051
?LOG_DEBUG(
5152
"Mnesia->Khepri data copy: [~0p] key: ~0p",
@@ -63,7 +64,7 @@ copy_to_khepri(rabbit_queue = Table, Record,
6364
copy_to_khepri(Table, Record, State) ->
6465
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
6566
[Table, Record, State]),
66-
{error, unexpected_record}.
67+
{ok, State}.
6768

6869
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
6970
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_rtparams_m2k_converter.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ copy_to_khepri(
6363
copy_to_khepri(Table, Record, State) ->
6464
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
6565
[Table, Record, State]),
66-
{error, unexpected_record}.
66+
{ok, State}.
6767

6868
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
6969
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_user_m2k_converter.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-include_lib("khepri/include/khepri.hrl").
1414
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
1515
-include_lib("rabbit_common/include/rabbit.hrl").
16+
-include("internal_user.hrl").
1617

1718
-export([init_copy_to_khepri/3,
1819
copy_to_khepri/3,
@@ -46,7 +47,7 @@ init_copy_to_khepri(StoreId, _MigrationId, Tables) ->
4647

4748
copy_to_khepri(
4849
rabbit_user = Table, Record,
49-
#?MODULE{store_id = StoreId} = State) ->
50+
#?MODULE{store_id = StoreId} = State) when ?is_internal_user(Record) ->
5051
Username = internal_user:get_username(Record),
5152
?LOG_DEBUG(
5253
"Mnesia->Khepri data copy: [~0p] key: ~0p",
@@ -62,7 +63,7 @@ copy_to_khepri(
6263
Error -> Error
6364
end;
6465
copy_to_khepri(
65-
rabbit_user_permission = Table, Record,
66+
rabbit_user_permission = Table, #user_permission{} = Record,
6667
#?MODULE{store_id = StoreId} = State) ->
6768
#user_permission{
6869
user_vhost = #user_vhost{
@@ -89,7 +90,7 @@ copy_to_khepri(
8990
Error -> Error
9091
end;
9192
copy_to_khepri(
92-
rabbit_topic_permission = Table, Record,
93+
rabbit_topic_permission = Table, #topic_permission{} = Record,
9394
#?MODULE{store_id = StoreId} = State) ->
9495
#topic_permission{
9596
topic_permission_key =
@@ -122,7 +123,7 @@ copy_to_khepri(
122123
copy_to_khepri(Table, Record, State) ->
123124
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
124125
[Table, Record, State]),
125-
{error, unexpected_record}.
126+
{ok, State}.
126127

127128
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
128129
Table :: mnesia_to_khepri:mnesia_table(),

deps/rabbit/src/rabbit_db_vhost_m2k_converter.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-include_lib("khepri/include/khepri.hrl").
1414
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
1515
-include_lib("rabbit_common/include/rabbit.hrl").
16+
-include("vhost.hrl").
1617

1718
-export([init_copy_to_khepri/3,
1819
copy_to_khepri/3,
@@ -46,7 +47,7 @@ init_copy_to_khepri(StoreId, _MigrationId, Tables) ->
4647

4748
copy_to_khepri(
4849
rabbit_vhost = Table, Record,
49-
#?MODULE{store_id = StoreId} = State) ->
50+
#?MODULE{store_id = StoreId} = State) when ?is_vhost(Record) ->
5051
Name = vhost:get_name(Record),
5152
?LOG_DEBUG(
5253
"Mnesia->Khepri data copy: [~0p] key: ~0p",
@@ -64,7 +65,7 @@ copy_to_khepri(
6465
copy_to_khepri(Table, Record, State) ->
6566
?LOG_DEBUG("Mnesia->Khepri unexpected record table ~0p record ~0p state ~0p",
6667
[Table, Record, State]),
67-
{error, unexpected_record}.
68+
{ok, State}.
6869

6970
-spec delete_from_khepri(Table, Key, Priv) -> Ret when
7071
Table :: mnesia_to_khepri:mnesia_table(),

0 commit comments

Comments
 (0)