Skip to content

Commit 8119283

Browse files
committed
Support transient queues in Khepri
1 parent 6677582 commit 8119283

File tree

4 files changed

+97
-18
lines changed

4 files changed

+97
-18
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
pattern_match_all/0,
7979
pattern_match_on_name/1,
8080
pattern_match_on_type/1,
81+
pattern_match_on_durable/1,
82+
pattern_match_on_type_and_durable/2,
8183
reset_mirroring_and_decorators/1,
8284
set_immutable/1,
8385
qnode/1,
@@ -590,6 +592,17 @@ pattern_match_on_name(Name) ->
590592
pattern_match_on_type(Type) ->
591593
#amqqueue{type = Type, _ = '_'}.
592594

595+
-spec pattern_match_on_durable(boolean()) -> amqqueue_pattern().
596+
597+
pattern_match_on_durable(IsDurable) ->
598+
#amqqueue{durable = IsDurable, _ = '_'}.
599+
600+
-spec pattern_match_on_type_and_durable(atom(), boolean()) ->
601+
amqqueue_pattern().
602+
603+
pattern_match_on_type_and_durable(Type, IsDurable) ->
604+
#amqqueue{type = Type, durable = IsDurable, _ = '_'}.
605+
593606
-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue().
594607

595608
reset_mirroring_and_decorators(#amqqueue{} = Queue) ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
-module(rabbit_db_queue).
99

10+
-include_lib("kernel/include/logger.hrl").
11+
-include_lib("stdlib/include/assert.hrl").
12+
-include_lib("stdlib/include/qlc.hrl").
13+
1014
-include_lib("khepri/include/khepri.hrl").
1115
-include_lib("rabbit_common/include/rabbit.hrl").
12-
-include_lib("stdlib/include/qlc.hrl").
1316
-include("amqqueue.hrl").
1417

1518
-export([
@@ -169,7 +172,8 @@ get_all_durable_in_mnesia() ->
169172
get_all_durable_in_khepri() ->
170173
list_with_possible_retry_in_khepri(
171174
fun() ->
172-
ets:tab2list(?KHEPRI_PROJECTION)
175+
Pattern = amqqueue:pattern_match_on_durable(true),
176+
ets:match_object(?KHEPRI_PROJECTION, Pattern)
173177
end).
174178

175179
-spec get_all_durable_by_type(Type) -> [Queue] when
@@ -193,7 +197,7 @@ get_all_durable_by_type_in_mnesia(Type) ->
193197
rabbit_db:list_in_mnesia(?MNESIA_DURABLE_TABLE, Pattern).
194198

195199
get_all_durable_by_type_in_khepri(Type) ->
196-
Pattern = amqqueue:pattern_match_on_type(Type),
200+
Pattern = amqqueue:pattern_match_on_type_and_durable(Type, true),
197201
ets:match_object(?KHEPRI_PROJECTION, Pattern).
198202

199203
%% -------------------------------------------------------------------
@@ -227,7 +231,7 @@ filter_all_durable_in_mnesia(FilterFun) ->
227231
filter_all_durable_in_khepri(FilterFun) ->
228232
ets:foldl(
229233
fun(Q, Acc0) ->
230-
case FilterFun(Q) of
234+
case amqqueue:is_durable(Q) andalso FilterFun(Q) of
231235
true -> [Q | Acc0];
232236
false -> Acc0
233237
end
@@ -478,12 +482,23 @@ get_in_khepri(Name) ->
478482
get_durable(Name) ->
479483
rabbit_khepri:handle_fallback(
480484
#{mnesia => fun() -> get_durable_in_mnesia(Name) end,
481-
khepri => fun() -> get_in_khepri(Name) end
485+
khepri => fun() -> get_durable_in_khepri(Name) end
482486
}).
483487

484488
get_durable_in_mnesia(Name) ->
485489
rabbit_mnesia:dirty_read({?MNESIA_DURABLE_TABLE, Name}).
486490

491+
get_durable_in_khepri(Name) ->
492+
case get_in_khepri(Name) of
493+
{ok, Queue} = Ret ->
494+
case amqqueue:is_durable(Queue) of
495+
true -> Ret;
496+
false -> {error, not_found}
497+
end;
498+
Error ->
499+
Error
500+
end.
501+
487502
%% -------------------------------------------------------------------
488503
%% get_many_durable().
489504
%% -------------------------------------------------------------------
@@ -494,10 +509,17 @@ get_durable_in_mnesia(Name) ->
494509

495510
get_many_durable(Names) when is_list(Names) ->
496511
rabbit_khepri:handle_fallback(
497-
#{mnesia => fun() -> get_many_in_ets(?MNESIA_DURABLE_TABLE, Names) end,
498-
khepri => fun() -> get_many_in_ets(?KHEPRI_PROJECTION, Names) end
512+
#{mnesia => fun() -> get_many_durable_in_mnesia(Names) end,
513+
khepri => fun() -> get_many_durable_in_khepri(Names) end
499514
}).
500515

516+
get_many_durable_in_mnesia(Names) ->
517+
get_many_in_ets(?MNESIA_DURABLE_TABLE, Names).
518+
519+
get_many_durable_in_khepri(Names) ->
520+
Queues = get_many_in_ets(?KHEPRI_PROJECTION, Names),
521+
[Q || Q <- Queues, amqqueue:is_durable(Q)].
522+
501523
%% -------------------------------------------------------------------
502524
%% update().
503525
%% -------------------------------------------------------------------
@@ -632,7 +654,9 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
632654
fun() ->
633655
khepri_tx:foreach(Path,
634656
fun(Path0, #{data := Q}) ->
635-
case FilterFun(Q) of
657+
DoUpdate = amqqueue:is_durable(Q)
658+
andalso FilterFun(Q),
659+
case DoUpdate of
636660
true ->
637661
khepri_tx:put(Path0, UpdateFun(Q));
638662
false ->
@@ -833,7 +857,7 @@ set_in_mnesia(Q) ->
833857
end).
834858

835859
set_in_mnesia_tx(DurableQ, Q) ->
836-
case ?amqqueue_is_durable(Q) of
860+
case amqqueue:is_durable(Q) of
837861
true ->
838862
ok = mnesia:write(?MNESIA_DURABLE_TABLE, DurableQ, write);
839863
false ->
@@ -852,8 +876,9 @@ set_in_khepri(Q) ->
852876
-spec set_many([Queue]) -> ok when
853877
Queue :: amqqueue:amqqueue().
854878
%% @doc Writes a list of durable queue records.
855-
%% It is responsibility of the calling function to ensure all records are durable.
856-
%% Once transient entities are deprecated, this is a non-issue.
879+
%%
880+
%% It is responsibility of the calling function to ensure all records are
881+
%% durable.
857882
%%
858883
%% @private
859884

@@ -868,7 +893,10 @@ set_many_in_mnesia(Qs) ->
868893
%% Just to be nested in forget_node_for_queue
869894
mnesia:transaction(
870895
fun() ->
871-
[ok = mnesia:write(?MNESIA_DURABLE_TABLE, Q, write) || Q <- Qs],
896+
[begin
897+
?assert(amqqueue:is_durable(Q)),
898+
ok = mnesia:write(?MNESIA_DURABLE_TABLE, Q, write)
899+
end || Q <- Qs],
872900
ok
873901
end),
874902
ok.
@@ -877,6 +905,7 @@ set_many_in_khepri(Qs) ->
877905
rabbit_khepri:transaction(
878906
fun() ->
879907
[begin
908+
?assert(amqqueue:is_durable(Q)),
880909
Path = khepri_queue_path(amqqueue:get_name(Q)),
881910
case khepri_tx:put(Path, Q) of
882911
ok -> ok;
@@ -902,7 +931,7 @@ set_many_in_khepri(Qs) ->
902931
delete_transient(FilterFun) ->
903932
rabbit_khepri:handle_fallback(
904933
#{mnesia => fun() -> delete_transient_in_mnesia(FilterFun) end,
905-
khepri => fun() -> ok end
934+
khepri => fun() -> delete_transient_in_khepri(FilterFun) end
906935
}).
907936

908937
delete_transient_in_mnesia(FilterFun) ->
@@ -944,6 +973,33 @@ partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
944973
partition_queues(T) ->
945974
[T].
946975

976+
delete_transient_in_khepri(FilterFun) ->
977+
PathPattern = khepri_queues_path() ++
978+
[?KHEPRI_WILDCARD_STAR,
979+
#if_data_matches{
980+
pattern = amqqueue:pattern_match_on_durable(false)}],
981+
Ret = rabbit_khepri:fold(
982+
PathPattern,
983+
fun(Path, #{data := Queue}, Acc) ->
984+
case FilterFun(Queue) of
985+
true ->
986+
QueueName = khepri_queue_path_to_name(Path),
987+
case delete_in_khepri(QueueName, false) of
988+
ok -> Acc;
989+
Deletions -> [{QueueName, Deletions} | Acc]
990+
end;
991+
false ->
992+
Acc
993+
end
994+
end, []),
995+
case Ret of
996+
{ok, Items} ->
997+
{QueueNames, Deletions} = lists:unzip(Items),
998+
{QueueNames, lists:flatten(Deletions)};
999+
{error, _} = Error ->
1000+
Error
1001+
end.
1002+
9471003
%% -------------------------------------------------------------------
9481004
%% foreach_transient().
9491005
%% -------------------------------------------------------------------
@@ -958,7 +1014,7 @@ partition_queues(T) ->
9581014
foreach_transient(UpdateFun) ->
9591015
rabbit_khepri:handle_fallback(
9601016
#{mnesia => fun() -> foreach_transient_in_mnesia(UpdateFun) end,
961-
khepri => fun() -> ok end
1017+
khepri => fun() -> throw(not_implemented) end
9621018
}).
9631019

9641020
foreach_transient_in_mnesia(UpdateFun) ->
@@ -1001,7 +1057,10 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
10011057
ok.
10021058

10031059
foreach_durable_in_khepri(UpdateFun, FilterFun) ->
1004-
Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()],
1060+
Path = khepri_queues_path() ++
1061+
[?KHEPRI_WILDCARD_STAR,
1062+
#if_data_matches{
1063+
pattern = amqqueue:pattern_match_on_durable(true)}],
10051064
case rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
10061065
FilterFun(Q)
10071066
end) of
@@ -1190,3 +1249,6 @@ khepri_queues_path() ->
11901249

11911250
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
11921251
[?MODULE, queues, VHost, Name].
1252+
1253+
khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
1254+
rabbit_misc:r(VHost, queue, Name).

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
update/2,
3434
cas/3,
3535
fold/3,
36+
foreach/2,
3637
filter/2,
3738

3839
get/1,
@@ -669,6 +670,8 @@ cas(Path, Pattern, Data) ->
669670

670671
fold(Path, Pred, Acc) -> khepri:fold(?STORE_ID, Path, Pred, Acc).
671672

673+
foreach(Path, Pred) -> khepri:foreach(?STORE_ID, Path, Pred).
674+
672675
filter(Path, Pred) -> khepri:filter(?STORE_ID, Path, Pred).
673676

674677
get(Path) ->

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ transient_queue_on_node_down_khepri(Config) ->
910910

911911
rabbit_ct_broker_helpers:stop_node(Config, Server),
912912

913-
Bindings1 = lists:sort([DirectBinding, DirectAltBinding]),
913+
Bindings1 = lists:sort([DirectBinding]),
914914
?awaitMatch(Bindings1,
915915
lists:sort(
916916
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>])),
@@ -921,9 +921,10 @@ transient_queue_on_node_down_khepri(Config) ->
921921

922922
rabbit_ct_broker_helpers:start_node(Config, Server),
923923

924-
?awaitMatch([_, _], rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
924+
Bindings2 = lists:sort([DefaultBinding, DirectBinding]),
925+
?awaitMatch([_], rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, list, [<<"/">>]),
925926
30000),
926-
?awaitMatch(Bindings,
927+
?awaitMatch(Bindings2,
927928
lists:sort(
928929
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_binding, list, [<<"/">>])),
929930
30000),

0 commit comments

Comments
 (0)