Skip to content

Commit e01d938

Browse files
the-mikedavismergify[bot]
authored andcommitted
Handle database failures when deleting exchanges
A common case for exchange deletion is that callers want the deletion to be idempotent: they treat the `ok` and `{error, not_found}` returns from `rabbit_exchange:delete/3` the same way. To simplify these callsites we add a `rabbit_exchange:ensure_deleted/3` that wraps `rabbit_exchange:delete/3` and returns `ok` when the exchange did not exist. Part of this commit is to update callsites to use this helper. The other part is to handle the `rabbit_khepri:timeout()` error possible when Khepri is in a minority. For most callsites this is just a matter of adding a branch to their `case` clauses and an appropriate error and message. (cherry picked from commit e7489d2)
1 parent 0106f30 commit e01d938

11 files changed

+84
-20
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,15 @@ handle_http_req(<<"DELETE">>,
285285
ok = prohibit_default_exchange(XName),
286286
ok = prohibit_reserved_amq(XName),
287287
PermCache = check_resource_access(XName, configure, User, PermCache0),
288-
_ = rabbit_exchange:delete(XName, false, Username),
289-
{<<"204">>, null, {PermCache, TopicPermCache}};
288+
case rabbit_exchange:ensure_deleted(XName, false, Username) of
289+
ok ->
290+
{<<"204">>, null, {PermCache, TopicPermCache}};
291+
{error, timeout} ->
292+
throw(
293+
<<"500">>,
294+
"failed to delete exchange '~ts' due to a timeout",
295+
[XNameBin])
296+
end;
290297

291298
handle_http_req(<<"POST">>,
292299
[<<"bindings">>],

deps/rabbit/src/rabbit_channel.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
25122512
check_not_default_exchange(ExchangeName),
25132513
check_exchange_deletion(ExchangeName),
25142514
check_configure_permitted(ExchangeName, User, AuthzContext),
2515-
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
2516-
{error, not_found} ->
2515+
case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of
2516+
ok ->
25172517
ok;
25182518
{error, in_use} ->
25192519
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
2520-
ok ->
2521-
ok
2520+
{error, timeout} ->
2521+
rabbit_misc:protocol_error(
2522+
internal_error,
2523+
"failed to delete exchange '~ts' due to a timeout",
2524+
[rabbit_misc:rs(ExchangeName)])
25222525
end;
25232526
handle_method(#'queue.purge'{queue = QueueNameBin},
25242527
ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
561561
Exchange :: rabbit_types:exchange(),
562562
Binding :: rabbit_types:binding(),
563563
Deletions :: dict:dict(),
564-
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
564+
Ret :: {deleted, Exchange, [Binding], Deletions} |
565+
{error, not_found} |
566+
{error, in_use} |
567+
rabbit_khepri:timeout_error().
565568
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
566569
%% to `true', it is only deleted when there are no bindings present on the
567570
%% exchange.

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
1414
update_scratch/3, update_decorators/2, immutable/1,
1515
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
16-
route/2, route/3, delete/3, validate_binding/2, count/0]).
16+
route/2, route/3, delete/3, validate_binding/2, count/0,
17+
ensure_deleted/3]).
1718
-export([list_names/0]).
1819
-export([serialise_events/1]).
1920
-export([serial/1, peek_serial/1]).
@@ -444,9 +445,13 @@ cons_if_present(XName, L) ->
444445

445446
-spec delete
446447
(name(), 'true', rabbit_types:username()) ->
447-
'ok'| rabbit_types:error('not_found' | 'in_use');
448+
'ok' |
449+
rabbit_types:error('not_found' | 'in_use') |
450+
rabbit_khepri:timeout_error();
448451
(name(), 'false', rabbit_types:username()) ->
449-
'ok' | rabbit_types:error('not_found').
452+
'ok' |
453+
rabbit_types:error('not_found') |
454+
rabbit_khepri:timeout_error().
450455

451456
delete(XName, IfUnused, Username) ->
452457
try
@@ -478,6 +483,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
478483
rabbit_binding:add_deletion(
479484
XName, {X, deleted, Bs}, Deletions)).
480485

486+
-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
487+
ExchangeName :: name(),
488+
IfUnused :: boolean(),
489+
Username :: rabbit_types:username(),
490+
Ret :: ok |
491+
rabbit_types:error('in_use') |
492+
rabbit_khepri:timeout_error().
493+
%% @doc A wrapper around `delete/3' which returns `ok' in the case that the
494+
%% exchange did not exist at time of deletion.
495+
496+
ensure_deleted(XName, IfUnused, Username) ->
497+
case delete(XName, IfUnused, Username) of
498+
ok ->
499+
ok;
500+
{error, not_found} ->
501+
ok;
502+
{error, _} = Err ->
503+
Err
504+
end.
505+
481506
-spec validate_binding
482507
(rabbit_types:exchange(), rabbit_types:binding())
483508
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).

deps/rabbit/src/rabbit_logger_exchange_h.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,16 @@ unconfigure_exchange(
196196
virtual_host = VHost} = Exchange,
197197
setup_proc := Pid}}) ->
198198
Pid ! stop,
199-
_ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
199+
case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of
200+
ok ->
201+
ok;
202+
{error, timeout} ->
203+
?LOG_ERROR(
204+
"Could not delete exchange '~ts' in vhost '~ts' due to a timeout",
205+
[Name, VHost],
206+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
207+
ok
208+
end,
200209
?LOG_INFO(
201210
"Logging to exchange '~ts' in vhost '~ts' disabled",
202211
[Name, VHost],

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ delete(VHost, ActingUser) ->
275275
assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser)
276276
end || Q <- rabbit_amqqueue:list(VHost)],
277277
rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]),
278-
[assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) ||
278+
[ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) ||
279279
#exchange{name = Name} <- rabbit_exchange:list(VHost)],
280280
rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]),
281281
_ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser),

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,8 @@ delete_queues() ->
873873
|| Q <- rabbit_amqqueue:list()].
874874

875875
delete_exchange(Name) ->
876-
_ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
876+
ok = rabbit_exchange:ensure_deleted(
877+
rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
877878

878879
declare(Ch, Q, Args) ->
879880
declare(Ch, Q, Args, true).

deps/rabbit/test/cluster_minority_SUITE.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ groups() ->
2424
{client_operations, [], [open_connection,
2525
open_channel,
2626
declare_exchange,
27+
delete_exchange,
2728
declare_binding,
2829
delete_binding,
2930
declare_queue,
@@ -100,6 +101,8 @@ init_per_group(Group, Config0) when Group == client_operations;
100101
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
101102
source = <<"amq.direct">>,
102103
routing_key = <<"binding-to-be-deleted">>}),
104+
%% To be used in delete_exchange
105+
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"exchange-to-be-deleted">>}),
103106

104107
%% Lower the default Khepri command timeout. By default this is set
105108
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
@@ -157,6 +160,12 @@ declare_exchange(Config) ->
157160
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
158161
amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"test-exchange">>})).
159162

163+
delete_exchange(Config) ->
164+
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
165+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
166+
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
167+
amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"exchange-to-be-deleted">>})).
168+
160169
declare_binding(Config) ->
161170
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
162171
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),

deps/rabbit/test/exchanges_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ delete_queues() ->
340340
|| Q <- rabbit_amqqueue:list()].
341341

342342
delete_exchange(Name) ->
343-
_ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
343+
ok = rabbit_exchange:ensure_deleted(
344+
rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
344345

345346
declare(Ch, Q, Args) ->
346347
declare(Ch, Q, Args, true).

deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ register() ->
4343
gen_event:add_handler(rabbit_event, ?MODULE, []).
4444

4545
unregister() ->
46-
_ = rabbit_exchange:delete(exchange(), false, ?INTERNAL_USER),
47-
gen_event:delete_handler(rabbit_event, ?MODULE, []).
46+
case rabbit_exchange:ensure_deleted(exchange(), false, ?INTERNAL_USER) of
47+
ok ->
48+
gen_event:delete_handler(rabbit_event, ?MODULE, []),
49+
ok;
50+
{error, _} = Err ->
51+
Err
52+
end.
4853

4954
exchange() ->
5055
exchange(get_vhost()).

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -889,11 +889,12 @@ delete_super_stream_exchange(VirtualHost, Name, Username) ->
889889
case rabbit_stream_utils:enforce_correct_name(Name) of
890890
{ok, CorrectName} ->
891891
ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
892-
case rabbit_exchange:delete(ExchangeName, false, Username) of
893-
{error, not_found} ->
894-
ok;
892+
case rabbit_exchange:ensure_deleted(
893+
ExchangeName, false, Username) of
895894
ok ->
896-
ok
895+
ok;
896+
{error, timeout} = Err ->
897+
Err
897898
end;
898899
error ->
899900
{error, validation_failed}

0 commit comments

Comments
 (0)