Skip to content

Commit 8c6b866

Browse files
Merge pull request #11667 from rabbitmq/md/khepri-projections-wrap-ets-calls
rabbit_db_*: Wrap `ets` calls to projections in `whereis/1` checks
2 parents 599727a + c490043 commit 8c6b866

File tree

7 files changed

+283
-114
lines changed

7 files changed

+283
-114
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
5454
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
5555
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
56+
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings).
57+
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
5658

5759
%% -------------------------------------------------------------------
5860
%% exists().
@@ -411,7 +413,12 @@ get_all_in_mnesia() ->
411413
end).
412414

413415
get_all_in_khepri() ->
414-
[B || #route{binding = B} <- ets:tab2list(rabbit_khepri_bindings)].
416+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
417+
undefined ->
418+
[];
419+
Table ->
420+
[B || #route{binding = B} <- ets:tab2list(Table)]
421+
end.
415422

416423
-spec get_all(VHostName) -> [Binding] when
417424
VHostName :: vhost:name(),
@@ -437,11 +444,16 @@ get_all_in_mnesia(VHost) ->
437444
[B || #route{binding = B} <- rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match)].
438445

439446
get_all_in_khepri(VHost) ->
440-
VHostResource = rabbit_misc:r(VHost, '_'),
441-
Match = #route{binding = #binding{source = VHostResource,
442-
destination = VHostResource,
443-
_ = '_'}},
444-
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Match)].
447+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
448+
undefined ->
449+
[];
450+
Table ->
451+
VHostResource = rabbit_misc:r(VHost, '_'),
452+
Match = #route{binding = #binding{source = VHostResource,
453+
destination = VHostResource,
454+
_ = '_'}},
455+
[B || #route{binding = B} <- ets:match_object(Table, Match)]
456+
end.
445457

446458
-spec get_all(Src, Dst, Reverse) -> [Binding] when
447459
Src :: rabbit_types:binding_source(),
@@ -469,10 +481,15 @@ get_all_in_mnesia(SrcName, DstName, Reverse) ->
469481
mnesia:async_dirty(Fun).
470482

471483
get_all_in_khepri(SrcName, DstName) ->
472-
MatchHead = #route{binding = #binding{source = SrcName,
473-
destination = DstName,
474-
_ = '_'}},
475-
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, MatchHead)].
484+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
485+
undefined ->
486+
[];
487+
Table ->
488+
MatchHead = #route{binding = #binding{source = SrcName,
489+
destination = DstName,
490+
_ = '_'}},
491+
[B || #route{binding = B} <- ets:match_object(Table, MatchHead)]
492+
end.
476493

477494
%% -------------------------------------------------------------------
478495
%% get_all_for_source().
@@ -511,8 +528,13 @@ list_for_route(Route, true) ->
511528
end.
512529

513530
get_all_for_source_in_khepri(Resource) ->
514-
Route = #route{binding = #binding{source = Resource, _ = '_'}},
515-
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Route)].
531+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
532+
undefined ->
533+
[];
534+
Table ->
535+
Route = #route{binding = #binding{source = Resource, _ = '_'}},
536+
[B || #route{binding = B} <- ets:match_object(Table, Route)]
537+
end.
516538

517539
%% -------------------------------------------------------------------
518540
%% get_all_for_destination().
@@ -541,9 +563,14 @@ get_all_for_destination_in_mnesia(Dst) ->
541563
mnesia:async_dirty(Fun).
542564

543565
get_all_for_destination_in_khepri(Destination) ->
544-
Match = #route{binding = #binding{destination = Destination,
545-
_ = '_'}},
546-
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Match)].
566+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
567+
undefined ->
568+
[];
569+
Table ->
570+
Match = #route{binding = #binding{destination = Destination,
571+
_ = '_'}},
572+
[B || #route{binding = B} <- ets:match_object(Table, Match)]
573+
end.
547574

548575
%% -------------------------------------------------------------------
549576
%% fold().
@@ -617,11 +644,16 @@ match_in_mnesia(SrcName, Match) ->
617644
Routes, Match(Binding)].
618645

619646
match_in_khepri(SrcName, Match) ->
620-
MatchHead = #route{binding = #binding{source = SrcName,
621-
_ = '_'}},
622-
Routes = ets:select(rabbit_khepri_bindings, [{MatchHead, [], [['$_']]}]),
623-
[Dest || [#route{binding = Binding = #binding{destination = Dest}}] <-
624-
Routes, Match(Binding)].
647+
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
648+
undefined ->
649+
[];
650+
Table ->
651+
MatchHead = #route{binding = #binding{source = SrcName,
652+
_ = '_'}},
653+
Routes = ets:select(Table, [{MatchHead, [], [['$_']]}]),
654+
[Dest || [#route{binding = Binding = #binding{destination = Dest}}] <-
655+
Routes, Match(Binding)]
656+
end.
625657

626658
%% Routing - HOT CODE PATH
627659
%% -------------------------------------------------------------------
@@ -654,18 +686,26 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
654686
route_in_mnesia_v1(SrcName, RoutingKeys)
655687
end.
656688

657-
match_routing_key_in_khepri(Src, ['_']) ->
689+
match_routing_key_in_khepri(Src, RoutingKeys) ->
690+
case ets:whereis(?KHEPRI_INDEX_ROUTE_PROJECTION) of
691+
undefined ->
692+
[];
693+
Table ->
694+
do_match_routing_key_in_khepri(Table, Src, RoutingKeys)
695+
end.
696+
697+
do_match_routing_key_in_khepri(Table, Src, ['_']) ->
658698
MatchHead = #index_route{source_key = {Src, '_'},
659699
destination = '$1',
660700
_ = '_'},
661-
ets:select(rabbit_khepri_index_route, [{MatchHead, [], ['$1']}]);
701+
ets:select(Table, [{MatchHead, [], ['$1']}]);
662702

663-
match_routing_key_in_khepri(Src, RoutingKeys) ->
703+
do_match_routing_key_in_khepri(Table, Src, RoutingKeys) ->
664704
lists:foldl(
665705
fun(RK, Acc) ->
666706
try
667707
Dst = ets:lookup_element(
668-
rabbit_khepri_index_route,
708+
Table,
669709
{Src, RK},
670710
#index_route.destination),
671711
Dst ++ Acc

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
-define(MNESIA_TABLE, rabbit_exchange).
5858
-define(MNESIA_DURABLE_TABLE, rabbit_durable_exchange).
5959
-define(MNESIA_SERIAL_TABLE, rabbit_exchange_serial).
60+
-define(KHEPRI_PROJECTION, rabbit_khepri_exchange).
6061

6162
%% -------------------------------------------------------------------
6263
%% get_all().
@@ -182,9 +183,14 @@ get_in_mnesia(Name) ->
182183
rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
183184

184185
get_in_khepri(Name) ->
185-
case ets:lookup(rabbit_khepri_exchange, Name) of
186-
[X] -> {ok, X};
187-
[] -> {error, not_found}
186+
case ets:whereis(?KHEPRI_PROJECTION) of
187+
undefined ->
188+
{error, not_found};
189+
Table ->
190+
case ets:lookup(Table, Name) of
191+
[X] -> {ok, X};
192+
[] -> {error, not_found}
193+
end
188194
end.
189195

190196
%% -------------------------------------------------------------------
@@ -227,7 +233,12 @@ get_many_in_mnesia(Table, Names) when is_list(Names) ->
227233
lists:append([ets:lookup(Table, Name) || Name <- Names]).
228234

229235
get_many_in_khepri(Names) when is_list(Names) ->
230-
lists:append([ets:lookup(rabbit_khepri_exchange, Name) || Name <- Names]).
236+
case ets:whereis(?KHEPRI_PROJECTION) of
237+
undefined ->
238+
[];
239+
Table ->
240+
lists:append([ets:lookup(Table, Name) || Name <- Names])
241+
end.
231242

232243
%% -------------------------------------------------------------------
233244
%% count().

0 commit comments

Comments
 (0)