Skip to content

Commit 0dd26f0

Browse files
committed
rabbit_db_queue: Transactionally delete transient queues from Khepri
The prior code skirted transactions because the filter function might cause Khepri to call itself. We want to use the same idea as the old code - get all queues, filter them, then delete them - but we want to perform the deletion in a transaction and fail the transaction if any queues changed since we read them. This fixes a bug - that the call to `delete_in_khepri/2` could return an error tuple that would be improperly recognized as `Deletions` - but should also make deleting transient queues atomic and fast. Each call to `delete_in_khepri/2` needed to wait on Ra to replicate because the deletion is an individual command sent from one process. Performing all deletions at once means we only need to wait for one command to be replicated across the cluster. We also bubble up any errors to delete now rather than storing them as deletions. This fixes a crash that occurs on node down when Khepri is in a minority.
1 parent d0da0b5 commit 0dd26f0

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,8 @@ set_many_in_khepri(Qs) ->
10121012
Queue :: amqqueue:amqqueue(),
10131013
FilterFun :: fun((Queue) -> boolean()),
10141014
QName :: rabbit_amqqueue:name(),
1015-
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
1015+
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
1016+
| rabbit_khepri:timeout_error().
10161017
%% @doc Deletes all transient queues that match `FilterFun'.
10171018
%%
10181019
%% @private
@@ -1073,26 +1074,59 @@ delete_transient_in_khepri(FilterFun) ->
10731074
%% process might call itself. Instead we can fetch all of the transient
10741075
%% queues with `get_many' and then filter and fold the results outside of
10751076
%% Khepri's Ra server process.
1076-
case rabbit_khepri:get_many(PathPattern) of
1077-
{ok, Qs} ->
1078-
Items = maps:fold(
1079-
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
1080-
case FilterFun(Queue) of
1081-
true ->
1082-
QueueName = khepri_queue_path_to_name(
1083-
Path),
1084-
case delete_in_khepri(QueueName, false) of
1085-
ok ->
1086-
Acc;
1087-
Deletions ->
1088-
[{QueueName, Deletions} | Acc]
1089-
end;
1090-
false ->
1091-
Acc
1092-
end
1093-
end, [], Qs),
1094-
{QueueNames, Deletions} = lists:unzip(Items),
1095-
{QueueNames, lists:flatten(Deletions)};
1077+
case rabbit_khepri:adv_get_many(PathPattern) of
1078+
{ok, Props} ->
1079+
Qs = maps:fold(
1080+
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1081+
when ?is_amqqueue(Q) ->
1082+
case FilterFun(Q) of
1083+
true ->
1084+
Path = khepri_path:combine_with_conditions(
1085+
Path0,
1086+
[#if_payload_version{version = Vsn}]),
1087+
QName = amqqueue:get_name(Q),
1088+
[{Path, QName} | Acc];
1089+
false ->
1090+
Acc
1091+
end
1092+
end, [], Props),
1093+
do_delete_transient_queues_in_khepri(Qs, FilterFun);
1094+
{error, _} = Error ->
1095+
Error
1096+
end.
1097+
1098+
do_delete_transient_queues_in_khepri([], _FilterFun) ->
1099+
%% If there are no changes to make, avoid performing a transaction. When
1100+
%% Khepri is in a minority this avoids a long timeout waiting for the
1101+
%% transaction command to be processed. Otherwise it avoids appending a
1102+
%% somewhat large transaction command to Khepri's log.
1103+
{[], []};
1104+
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
1105+
Res = rabbit_khepri:transaction(
1106+
fun() ->
1107+
rabbit_misc:fold_while_ok(
1108+
fun({Path, QName}, Acc) ->
1109+
%% Also see `delete_in_khepri/2'.
1110+
case khepri_tx_adv:delete(Path) of
1111+
{ok, #{data := _}} ->
1112+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1113+
QName, false),
1114+
{ok, [{QName, Deletions} | Acc]};
1115+
{ok, _} ->
1116+
{ok, Acc};
1117+
{error, _} = Error ->
1118+
Error
1119+
end
1120+
end, [], Qs)
1121+
end),
1122+
case Res of
1123+
{ok, Items} ->
1124+
{QNames, Deletions} = lists:unzip(Items),
1125+
{QNames, lists:flatten(Deletions)};
1126+
{error, {khepri, mismatching_node, _}} ->
1127+
%% One of the queues changed while attempting to update all
1128+
%% queues. Retry the operation.
1129+
delete_transient_in_khepri(FilterFun);
10961130
{error, _} = Error ->
10971131
Error
10981132
end.
@@ -1366,6 +1400,3 @@ khepri_queues_path() ->
13661400

13671401
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
13681402
[?MODULE, queues, VHost, Name].
1369-
1370-
khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
1371-
rabbit_misc:r(VHost, queue, Name).

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
maps_put_falsy/3
9090
]).
9191
-export([remote_sup_child/2]).
92-
-export([for_each_while_ok/2]).
92+
-export([for_each_while_ok/2, fold_while_ok/3]).
9393

9494
%% Horrible macro to use in guards
9595
-define(IS_BENIGN_EXIT(R),
@@ -1655,3 +1655,24 @@ for_each_while_ok(Fun, [Elem | Rest]) ->
16551655
end;
16561656
for_each_while_ok(_, []) ->
16571657
ok.
1658+
1659+
-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
1660+
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
1661+
Element :: any(),
1662+
List :: Element,
1663+
Ret :: {ok, Acc} | {error, ErrReason}.
1664+
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
1665+
%% accumulator value, short-circuiting if the function returns `{error,_}'.
1666+
%%
1667+
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
1668+
%% `FoldFun' never returns an error tuple.
1669+
1670+
fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
1671+
case Fun(Elem, Acc0) of
1672+
{ok, Acc} ->
1673+
fold_while_ok(Fun, Acc, Rest);
1674+
{error, _} = Error ->
1675+
Error
1676+
end;
1677+
fold_while_ok(_Fun, Acc, []) ->
1678+
{ok, Acc}.

0 commit comments

Comments
 (0)