Skip to content

Handle timeouts possible in Khepri minority in rabbit_db_exchange #11785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 24, 2024
26 changes: 21 additions & 5 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,18 @@ handle_http_req(<<"PUT">>,
{error, not_found} ->
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_reserved_amq(XName),
rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username)
case rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
throw(
<<"503">>,
"Could not create ~ts because the operation "
"timed out",
[rabbit_misc:rs(XName)])
end
end,
try rabbit_exchange:assert_equivalence(
X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of
Expand Down Expand Up @@ -285,8 +294,15 @@ handle_http_req(<<"DELETE">>,
ok = prohibit_default_exchange(XName),
ok = prohibit_reserved_amq(XName),
PermCache = check_resource_access(XName, configure, User, PermCache0),
_ = rabbit_exchange:delete(XName, false, Username),
{<<"204">>, null, {PermCache, TopicPermCache}};
case rabbit_exchange:ensure_deleted(XName, false, Username) of
ok ->
{<<"204">>, null, {PermCache, TopicPermCache}};
{error, timeout} ->
throw(
<<"503">>,
"failed to delete ~ts due to a timeout",
[rabbit_misc:rs(XName)])
end;

handle_http_req(<<"POST">>,
[<<"bindings">>],
Expand Down
34 changes: 23 additions & 11 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_not_default_exchange(ExchangeName),
check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
{error, not_found} ->
case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of
ok ->
ok;
{error, in_use} ->
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
ok
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"failed to delete ~ts due to a timeout",
[rabbit_misc:rs(ExchangeName)])
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
Expand Down Expand Up @@ -2566,13 +2569,22 @@ handle_method(#'exchange.declare'{exchange = XNameBin,
check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username)
case rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"failed to declare ~ts because the operation "
"timed out",
[rabbit_misc:rs(ExchangeName)])
end
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
AutoDelete, Internal, Args);
Expand Down
26 changes: 16 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ count_in_khepri() ->
%% update().
%% -------------------------------------------------------------------

-spec update(ExchangeName, UpdateFun) -> ok when
-spec update(ExchangeName, UpdateFun) -> Ret when
ExchangeName :: rabbit_exchange:name(),
UpdateFun :: fun((Exchange) -> Exchange).
UpdateFun :: fun((Exchange) -> Exchange),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Updates an existing exchange record using the result of
%% `UpdateFun'.
%%
Expand Down Expand Up @@ -367,7 +368,9 @@ update_in_khepri_tx(Name, Fun) ->

-spec create_or_get(Exchange) -> Ret when
Exchange :: rabbit_types:exchange(),
Ret :: {new, Exchange} | {existing, Exchange}.
Ret :: {new, Exchange} |
{existing, Exchange} |
rabbit_khepri:timeout_error().
%% @doc Writes an exchange record if it doesn't exist already or returns
%% the existing one.
%%
Expand Down Expand Up @@ -399,7 +402,9 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
ok ->
{new, X};
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
{existing, ExistingX}
{existing, ExistingX};
{error, timeout} = Err ->
Err
end.

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -523,17 +528,15 @@ next_serial_in_khepri(XName) ->
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(UpdatePath, Serial + 1) of
case rabbit_khepri:put(UpdatePath, Serial + 1, #{timeout => infinity}) of
ok ->
Serial;
{error, {khepri, mismatching_node, _}} ->
next_serial_in_khepri(XName);
Err ->
Err
next_serial_in_khepri(XName)
end;
_ ->
Serial = 1,
ok = rabbit_khepri:put(Path, Serial + 1),
ok = rabbit_khepri:put(Path, Serial + 1, #{timeout => infinity}),
Serial
end.

Expand All @@ -560,7 +563,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
Ret :: {deleted, Exchange, [Binding], Deletions} |
{error, not_found} |
{error, in_use} |
rabbit_khepri:timeout_error().
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
%% to `true', it is only deleted when there are no bindings present on the
%% exchange.
Expand Down
12 changes: 1 addition & 11 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
end, [], Props),
Res = rabbit_khepri:transaction(
fun() ->
for_each_while_ok(
rabbit_misc:for_each_while_ok(
fun({Path, Q}) -> khepri_tx:put(Path, Q) end,
Updates)
end),
Expand All @@ -749,16 +749,6 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
Error
end.

for_each_while_ok(Fun, [Elem | Rest]) ->
case Fun(Elem) of
ok ->
for_each_while_ok(Fun, Rest);
{error, _} = Error ->
Error
end;
for_each_while_ok(_, []) ->
ok.

%% -------------------------------------------------------------------
%% exists().
%% -------------------------------------------------------------------
Expand Down
19 changes: 12 additions & 7 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -863,13 +863,18 @@ add_exchange_int(Exchange, Name, ActingUser) ->
undefined -> false; %% =< 2.2.0
I -> I
end,
rabbit_exchange:declare(Name,
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
maps:get(durable, Exchange, undefined),
maps:get(auto_delete, Exchange, undefined),
Internal,
args(maps:get(arguments, Exchange, undefined)),
ActingUser)
case rabbit_exchange:declare(Name,
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
maps:get(durable, Exchange, undefined),
maps:get(auto_delete, Exchange, undefined),
Internal,
args(maps:get(arguments, Exchange, undefined)),
ActingUser) of
{ok, _Exchange} ->
ok;
{error, timeout} = Err ->
throw(Err)
end
end.

add_binding(Binding, ActingUser) ->
Expand Down
53 changes: 43 additions & 10 deletions deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/2, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, route/3, delete/3, validate_binding/2, count/0]).
route/2, route/3, delete/3, validate_binding/2, count/0,
ensure_deleted/3]).
-export([list_names/0]).
-export([serialise_events/1]).
-export([serial/1, peek_serial/1]).
Expand Down Expand Up @@ -91,10 +92,16 @@ serial(X) ->
true -> rabbit_db_exchange:next_serial(X#exchange.name)
end.

-spec declare
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:username())
-> rabbit_types:exchange().
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
Ret when
Name :: name(),
Type :: type(),
Durable :: boolean(),
AutoDelete :: boolean(),
Internal :: boolean(),
Args :: rabbit_framing:amqp_table(),
Username :: rabbit_types:username(),
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.

declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
X = rabbit_exchange_decorator:set(
Expand All @@ -121,14 +128,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
Serial = serial(Exchange),
ok = callback(X, create, Serial, [Exchange]),
rabbit_event:notify(exchange_created, info(Exchange)),
Exchange;
{ok, Exchange};
{existing, Exchange} ->
Exchange
{ok, Exchange};
{error, timeout} = Err ->
Err
end;
_ ->
rabbit_log:warning("ignoring exchange.declare for exchange ~tp,
exchange.delete in progress~n.", [XName]),
X
{ok, X}
end.

%% Used with binaries sent over the wire; the type may not exist.
Expand Down Expand Up @@ -444,9 +453,13 @@ cons_if_present(XName, L) ->

-spec delete
(name(), 'true', rabbit_types:username()) ->
'ok'| rabbit_types:error('not_found' | 'in_use');
'ok' |
rabbit_types:error('not_found' | 'in_use') |
rabbit_khepri:timeout_error();
(name(), 'false', rabbit_types:username()) ->
'ok' | rabbit_types:error('not_found').
'ok' |
rabbit_types:error('not_found') |
rabbit_khepri:timeout_error().

delete(XName, IfUnused, Username) ->
try
Expand Down Expand Up @@ -478,6 +491,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
rabbit_binding:add_deletion(
XName, {X, deleted, Bs}, Deletions)).

-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
ExchangeName :: name(),
IfUnused :: boolean(),
Username :: rabbit_types:username(),
Ret :: ok |
rabbit_types:error('in_use') |
rabbit_khepri:timeout_error().
%% @doc A wrapper around `delete/3' which returns `ok' in the case that the
%% exchange did not exist at time of deletion.

ensure_deleted(XName, IfUnused, Username) ->
case delete(XName, IfUnused, Username) of
ok ->
ok;
{error, not_found} ->
ok;
{error, _} = Err ->
Err
end.

-spec validate_binding
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
Expand Down
61 changes: 34 additions & 27 deletions deps/rabbit/src/rabbit_logger_exchange_h.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,16 @@ wait_for_initial_pass(N) ->
end.

setup_proc(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost}}} = Config) ->
#{config := #{exchange := Exchange}} = Config) ->
case declare_exchange(Config) of
ok ->
?LOG_INFO(
"Logging to exchange '~ts' in vhost '~ts' ready", [Name, VHost],
"Logging to ~ts ready", [rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL});
error ->
?LOG_DEBUG(
"Logging to exchange '~ts' in vhost '~ts' not ready, "
"trying again in ~b second(s)",
[Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS],
"Logging to ~ts not ready, trying again in ~b second(s)",
[rabbit_misc:rs(Exchange), ?DECL_EXCHANGE_INTERVAL_SECS],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
receive
stop -> ok
Expand All @@ -168,36 +166,45 @@ setup_proc(
end
end.

declare_exchange(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost} = Exchange}}) ->
try
%% Durable.
#exchange{} = rabbit_exchange:declare(
Exchange, topic, true, false, true, [],
?INTERNAL_USER),
?LOG_DEBUG(
"Declared exchange '~ts' in vhost '~ts'",
[Name, VHost],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok
declare_exchange(#{config := #{exchange := Exchange}}) ->
try rabbit_exchange:declare(
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of
{ok, #exchange{}} ->
?LOG_DEBUG(
"Declared ~ts",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok;
{error, timeout} ->
?LOG_DEBUG(
"Could not declare ~ts because the operation timed out",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
error
catch
Class:Reason ->
?LOG_DEBUG(
"Could not declare exchange '~ts' in vhost '~ts', "
"reason: ~0p:~0p",
[Name, VHost, Class, Reason],
"Could not declare ~ts, reason: ~0p:~0p",
[rabbit_misc:rs(Exchange), Class, Reason],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
error
end.

unconfigure_exchange(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost} = Exchange,
#{config := #{exchange := Exchange,
setup_proc := Pid}}) ->
Pid ! stop,
_ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of
ok ->
ok;
{error, timeout} ->
?LOG_ERROR(
"Could not delete ~ts due to a timeout",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok
end,
?LOG_INFO(
"Logging to exchange '~ts' in vhost '~ts' disabled",
[Name, VHost],
"Logging to ~ts disabled",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}).
Loading
Loading