Skip to content

Commit 79f6507

Browse files
Merge pull request #11991 from rabbitmq/mergify/bp/v3.13.x/pr-11990
Handle transient queue deletion in Khepri minority (backport #11979) (backport #11990)
2 parents 9381ccf + 4632a5a commit 79f6507

File tree

5 files changed

+158
-36
lines changed

5 files changed

+158
-36
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
-export([queue/1, queue_names/1]).
7676

7777
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
78+
-export([delete_transient_queues_on_node/1]).
7879

7980
%% internal
8081
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -2055,13 +2056,39 @@ maybe_clear_recoverable_node(Node) ->
20552056
-spec on_node_down(node()) -> 'ok'.
20562057

20572058
on_node_down(Node) ->
2059+
case delete_transient_queues_on_node(Node) of
2060+
ok ->
2061+
ok;
2062+
{error, timeout} ->
2063+
%% This case is possible when running Khepri. The node going down
2064+
%% could leave the cluster in a minority so the command to delete
2065+
%% the transient queue records would fail. Also see
2066+
%% `rabbit_khepri:init/0': we also try this deletion when the node
2067+
%% restarts - a time that the cluster is very likely to have a
2068+
%% majority - to ensure these records are deleted.
2069+
rabbit_log:warning("transient queues for node '~ts' could not be "
2070+
"deleted because of a timeout. These queues "
2071+
"will be removed when node '~ts' restarts or "
2072+
"is removed from the cluster.", [Node, Node]),
2073+
ok
2074+
end.
2075+
2076+
-spec delete_transient_queues_on_node(Node) -> Ret when
2077+
Node :: node(),
2078+
Ret :: ok | rabbit_khepri:timeout_error().
2079+
2080+
delete_transient_queues_on_node(Node) ->
20582081
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
20592082
case Ret of
2060-
ok -> ok;
2061-
{QueueNames, Deletions} ->
2083+
ok ->
2084+
ok;
2085+
{error, timeout} = Err ->
2086+
Err;
2087+
{QueueNames, Deletions} when is_list(QueueNames) ->
20622088
case length(QueueNames) of
20632089
0 -> ok;
2064-
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
2090+
N -> rabbit_log:info("~b transient queues from node '~ts' "
2091+
"deleted in ~fs",
20652092
[N, Node, Time / 1_000_000])
20662093
end,
20672094
notify_queue_binding_deletions(Deletions),

deps/rabbit/src/rabbit_db.erl

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,10 @@ init_using_mnesia() ->
102102
rabbit_sup:start_child(mnesia_sync).
103103

104104
init_using_khepri() ->
105-
case rabbit_khepri:members() of
106-
[] ->
107-
timer:sleep(1000),
108-
init_using_khepri();
109-
Members ->
110-
?LOG_WARNING(
111-
"Found the following metadata store members: ~p", [Members],
112-
#{domain => ?RMQLOG_DOMAIN_DB})
113-
end.
105+
?LOG_DEBUG(
106+
"DB: initialize Khepri",
107+
#{domain => ?RMQLOG_DOMAIN_DB}),
108+
rabbit_khepri:init().
114109

115110
init_finished() ->
116111
%% Used during initialisation by rabbit_logger_exchange_h.erl

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,8 @@ set_many_in_khepri(Qs) ->
10271027
Queue :: amqqueue:amqqueue(),
10281028
FilterFun :: fun((Queue) -> boolean()),
10291029
QName :: rabbit_amqqueue:name(),
1030-
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
1030+
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
1031+
| rabbit_khepri:timeout_error().
10311032
%% @doc Deletes all transient queues that match `FilterFun'.
10321033
%%
10331034
%% @private
@@ -1088,26 +1089,59 @@ delete_transient_in_khepri(FilterFun) ->
10881089
%% process might call itself. Instead we can fetch all of the transient
10891090
%% queues with `get_many' and then filter and fold the results outside of
10901091
%% Khepri's Ra server process.
1091-
case rabbit_khepri:get_many(PathPattern) of
1092-
{ok, Qs} ->
1093-
Items = maps:fold(
1094-
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
1095-
case FilterFun(Queue) of
1096-
true ->
1097-
QueueName = khepri_queue_path_to_name(
1098-
Path),
1099-
case delete_in_khepri(QueueName, false) of
1100-
ok ->
1101-
Acc;
1102-
Deletions ->
1103-
[{QueueName, Deletions} | Acc]
1104-
end;
1105-
false ->
1106-
Acc
1107-
end
1108-
end, [], Qs),
1109-
{QueueNames, Deletions} = lists:unzip(Items),
1110-
{QueueNames, lists:flatten(Deletions)};
1092+
case rabbit_khepri:adv_get_many(PathPattern) of
1093+
{ok, Props} ->
1094+
Qs = maps:fold(
1095+
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1096+
when ?is_amqqueue(Q) ->
1097+
case FilterFun(Q) of
1098+
true ->
1099+
Path = khepri_path:combine_with_conditions(
1100+
Path0,
1101+
[#if_payload_version{version = Vsn}]),
1102+
QName = amqqueue:get_name(Q),
1103+
[{Path, QName} | Acc];
1104+
false ->
1105+
Acc
1106+
end
1107+
end, [], Props),
1108+
do_delete_transient_queues_in_khepri(Qs, FilterFun);
1109+
{error, _} = Error ->
1110+
Error
1111+
end.
1112+
1113+
do_delete_transient_queues_in_khepri([], _FilterFun) ->
1114+
%% If there are no changes to make, avoid performing a transaction. When
1115+
%% Khepri is in a minority this avoids a long timeout waiting for the
1116+
%% transaction command to be processed. Otherwise it avoids appending a
1117+
%% somewhat large transaction command to Khepri's log.
1118+
{[], []};
1119+
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
1120+
Res = rabbit_khepri:transaction(
1121+
fun() ->
1122+
rabbit_misc:fold_while_ok(
1123+
fun({Path, QName}, Acc) ->
1124+
%% Also see `delete_in_khepri/2'.
1125+
case khepri_tx_adv:delete(Path) of
1126+
{ok, #{data := _}} ->
1127+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1128+
QName, false),
1129+
{ok, [{QName, Deletions} | Acc]};
1130+
{ok, _} ->
1131+
{ok, Acc};
1132+
{error, _} = Error ->
1133+
Error
1134+
end
1135+
end, [], Qs)
1136+
end),
1137+
case Res of
1138+
{ok, Items} ->
1139+
{QNames, Deletions} = lists:unzip(Items),
1140+
{QNames, lists:flatten(Deletions)};
1141+
{error, {khepri, mismatching_node, _}} ->
1142+
%% One of the queues changed while attempting to update all
1143+
%% queues. Retry the operation.
1144+
delete_transient_in_khepri(FilterFun);
11111145
{error, _} = Error ->
11121146
Error
11131147
end.
@@ -1382,6 +1416,3 @@ khepri_queues_path() ->
13821416

13831417
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
13841418
[?MODULE, queues, VHost, Name].
1385-
1386-
khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
1387-
rabbit_misc:r(VHost, queue, Name).

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696

9797
-export([setup/0,
9898
setup/1,
99+
init/0,
99100
can_join_cluster/1,
100101
add_member/2,
101102
remove_member/1,
@@ -323,6 +324,30 @@ wait_for_register_projections(Timeout, Retries) ->
323324

324325
%% @private
325326

327+
-spec init() -> Ret when
328+
Ret :: ok | timeout_error().
329+
330+
init() ->
331+
case members() of
332+
[] ->
333+
timer:sleep(1000),
334+
init();
335+
Members ->
336+
?LOG_NOTICE(
337+
"Found the following metadata store members: ~p", [Members],
338+
#{domain => ?RMQLOG_DOMAIN_DB}),
339+
%% Delete transient queues on init.
340+
%% Note that we also do this in the
341+
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
342+
%% deletion during init because the cluster may have been in a
343+
%% minority when this node went down. We wait for a majority while
344+
%% booting (via `rabbit_khepri:setup/0') though so this deletion is
345+
%% likely to succeed.
346+
rabbit_amqqueue:delete_transient_queues_on_node(node())
347+
end.
348+
349+
%% @private
350+
326351
can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->
327352
ThisNode = node(),
328353
try

deps/rabbit_common/src/rabbit_misc.erl

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
maps_put_falsy/3
8989
]).
9090
-export([remote_sup_child/2]).
91+
-export([for_each_while_ok/2, fold_while_ok/3]).
9192

9293
%% Horrible macro to use in guards
9394
-define(IS_BENIGN_EXIT(R),
@@ -1621,3 +1622,46 @@ remote_sup_child(Node, Sup) ->
16211622
[] -> {error, no_child};
16221623
{badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup}
16231624
end.
1625+
1626+
-spec for_each_while_ok(ForEachFun, List) -> Ret when
1627+
ForEachFun :: fun((Element) -> ok | {error, ErrReason}),
1628+
ErrReason :: any(),
1629+
Element :: any(),
1630+
List :: [Element],
1631+
Ret :: ok | {error, ErrReason}.
1632+
%% @doc Calls the given `ForEachFun' for each element in the given `List',
1633+
%% short-circuiting if the function returns `{error,_}'.
1634+
%%
1635+
%% @returns the first `{error,_}' returned by `ForEachFun' or `ok' if
1636+
%% `ForEachFun' never returns an error tuple.
1637+
1638+
for_each_while_ok(Fun, [Elem | Rest]) ->
1639+
case Fun(Elem) of
1640+
ok ->
1641+
for_each_while_ok(Fun, Rest);
1642+
{error, _} = Error ->
1643+
Error
1644+
end;
1645+
for_each_while_ok(_, []) ->
1646+
ok.
1647+
1648+
-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
1649+
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
1650+
Element :: any(),
1651+
List :: Element,
1652+
Ret :: {ok, Acc} | {error, ErrReason}.
1653+
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
1654+
%% accumulator value, short-circuiting if the function returns `{error,_}'.
1655+
%%
1656+
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
1657+
%% `FoldFun' never returns an error tuple.
1658+
1659+
fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
1660+
case Fun(Elem, Acc0) of
1661+
{ok, Acc} ->
1662+
fold_while_ok(Fun, Acc, Rest);
1663+
{error, _} = Error ->
1664+
Error
1665+
end;
1666+
fold_while_ok(_Fun, Acc, []) ->
1667+
{ok, Acc}.

0 commit comments

Comments
 (0)