Skip to content

Commit 2f96257

Browse files
the-mikedavismergify[bot]
authored andcommitted
rabbit_db_queue: Transactionally delete transient queues from Khepri
The prior code skirted transactions because the filter function might cause Khepri to call itself. We want to use the same idea as the old code - get all queues, filter them, then delete them - but we want to perform the deletion in a transaction and fail the transaction if any queues changed since we read them. This fixes a bug - that the call to `delete_in_khepri/2` could return an error tuple that would be improperly recognized as `Deletions` - but should also make deleting transient queues atomic and fast. Each call to `delete_in_khepri/2` needed to wait on Ra to replicate because the deletion is an individual command sent from one process. Performing all deletions at once means we only need to wait for one command to be replicated across the cluster. We also bubble up any errors to delete now rather than storing them as deletions. This fixes a crash that occurs on node down when Khepri is in a minority. (cherry picked from commit 0dd26f0) (cherry picked from commit 0f90906) # Conflicts: # deps/rabbit_common/src/rabbit_misc.erl
1 parent 666d2bc commit 2f96257

File tree

2 files changed

+105
-24
lines changed

2 files changed

+105
-24
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,8 @@ set_many_in_khepri(Qs) ->
10271027
Queue :: amqqueue:amqqueue(),
10281028
FilterFun :: fun((Queue) -> boolean()),
10291029
QName :: rabbit_amqqueue:name(),
1030-
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
1030+
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
1031+
| rabbit_khepri:timeout_error().
10311032
%% @doc Deletes all transient queues that match `FilterFun'.
10321033
%%
10331034
%% @private
@@ -1088,26 +1089,59 @@ delete_transient_in_khepri(FilterFun) ->
10881089
%% process might call itself. Instead we can fetch all of the transient
10891090
%% queues with `get_many' and then filter and fold the results outside of
10901091
%% Khepri's Ra server process.
1091-
case rabbit_khepri:get_many(PathPattern) of
1092-
{ok, Qs} ->
1093-
Items = maps:fold(
1094-
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
1095-
case FilterFun(Queue) of
1096-
true ->
1097-
QueueName = khepri_queue_path_to_name(
1098-
Path),
1099-
case delete_in_khepri(QueueName, false) of
1100-
ok ->
1101-
Acc;
1102-
Deletions ->
1103-
[{QueueName, Deletions} | Acc]
1104-
end;
1105-
false ->
1106-
Acc
1107-
end
1108-
end, [], Qs),
1109-
{QueueNames, Deletions} = lists:unzip(Items),
1110-
{QueueNames, lists:flatten(Deletions)};
1092+
case rabbit_khepri:adv_get_many(PathPattern) of
1093+
{ok, Props} ->
1094+
Qs = maps:fold(
1095+
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1096+
when ?is_amqqueue(Q) ->
1097+
case FilterFun(Q) of
1098+
true ->
1099+
Path = khepri_path:combine_with_conditions(
1100+
Path0,
1101+
[#if_payload_version{version = Vsn}]),
1102+
QName = amqqueue:get_name(Q),
1103+
[{Path, QName} | Acc];
1104+
false ->
1105+
Acc
1106+
end
1107+
end, [], Props),
1108+
do_delete_transient_queues_in_khepri(Qs, FilterFun);
1109+
{error, _} = Error ->
1110+
Error
1111+
end.
1112+
1113+
do_delete_transient_queues_in_khepri([], _FilterFun) ->
1114+
%% If there are no changes to make, avoid performing a transaction. When
1115+
%% Khepri is in a minority this avoids a long timeout waiting for the
1116+
%% transaction command to be processed. Otherwise it avoids appending a
1117+
%% somewhat large transaction command to Khepri's log.
1118+
{[], []};
1119+
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
1120+
Res = rabbit_khepri:transaction(
1121+
fun() ->
1122+
rabbit_misc:fold_while_ok(
1123+
fun({Path, QName}, Acc) ->
1124+
%% Also see `delete_in_khepri/2'.
1125+
case khepri_tx_adv:delete(Path) of
1126+
{ok, #{data := _}} ->
1127+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1128+
QName, false),
1129+
{ok, [{QName, Deletions} | Acc]};
1130+
{ok, _} ->
1131+
{ok, Acc};
1132+
{error, _} = Error ->
1133+
Error
1134+
end
1135+
end, [], Qs)
1136+
end),
1137+
case Res of
1138+
{ok, Items} ->
1139+
{QNames, Deletions} = lists:unzip(Items),
1140+
{QNames, lists:flatten(Deletions)};
1141+
{error, {khepri, mismatching_node, _}} ->
1142+
%% One of the queues changed while attempting to update all
1143+
%% queues. Retry the operation.
1144+
delete_transient_in_khepri(FilterFun);
11111145
{error, _} = Error ->
11121146
Error
11131147
end.
@@ -1382,6 +1416,3 @@ khepri_queues_path() ->
13821416

13831417
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
13841418
[?MODULE, queues, VHost, Name].
1385-
1386-
khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
1387-
rabbit_misc:r(VHost, queue, Name).

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@
8888
maps_put_falsy/3
8989
]).
9090
-export([remote_sup_child/2]).
91+
<<<<<<< HEAD
92+
=======
93+
-export([for_each_while_ok/2, fold_while_ok/3]).
94+
>>>>>>> 0f90906254 (rabbit_db_queue: Transactionally delete transient queues from Khepri)
9195

9296
%% Horrible macro to use in guards
9397
-define(IS_BENIGN_EXIT(R),
@@ -1621,3 +1625,49 @@ remote_sup_child(Node, Sup) ->
16211625
[] -> {error, no_child};
16221626
{badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup}
16231627
end.
1628+
<<<<<<< HEAD
1629+
=======
1630+
1631+
-spec for_each_while_ok(ForEachFun, List) -> Ret when
1632+
ForEachFun :: fun((Element) -> ok | {error, ErrReason}),
1633+
ErrReason :: any(),
1634+
Element :: any(),
1635+
List :: [Element],
1636+
Ret :: ok | {error, ErrReason}.
1637+
%% @doc Calls the given `ForEachFun' for each element in the given `List',
1638+
%% short-circuiting if the function returns `{error,_}'.
1639+
%%
1640+
%% @returns the first `{error,_}' returned by `ForEachFun' or `ok' if
1641+
%% `ForEachFun' never returns an error tuple.
1642+
1643+
for_each_while_ok(Fun, [Elem | Rest]) ->
1644+
case Fun(Elem) of
1645+
ok ->
1646+
for_each_while_ok(Fun, Rest);
1647+
{error, _} = Error ->
1648+
Error
1649+
end;
1650+
for_each_while_ok(_, []) ->
1651+
ok.
1652+
1653+
-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
1654+
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
1655+
Element :: any(),
1656+
List :: Element,
1657+
Ret :: {ok, Acc} | {error, ErrReason}.
1658+
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
1659+
%% accumulator value, short-circuiting if the function returns `{error,_}'.
1660+
%%
1661+
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
1662+
%% `FoldFun' never returns an error tuple.
1663+
1664+
fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
1665+
case Fun(Elem, Acc0) of
1666+
{ok, Acc} ->
1667+
fold_while_ok(Fun, Acc, Rest);
1668+
{error, _} = Error ->
1669+
Error
1670+
end;
1671+
fold_while_ok(_Fun, Acc, []) ->
1672+
{ok, Acc}.
1673+
>>>>>>> 0f90906254 (rabbit_db_queue: Transactionally delete transient queues from Khepri)

0 commit comments

Comments
 (0)