Skip to content

Commit e53596b

Browse files
Merge pull request #11821 from rabbitmq/mergify/bp/v4.0.x/pr-11785
Handle timeouts possible in Khepri minority in `rabbit_db_exchange` (backport #11785)
2 parents 6e7d34d + 2fa64c4 commit e53596b

17 files changed

+303
-159
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,18 @@ handle_http_req(<<"PUT">>,
210210
{error, not_found} ->
211211
ok = prohibit_cr_lf(XNameBin),
212212
ok = prohibit_reserved_amq(XName),
213-
rabbit_exchange:declare(
214-
XName, XTypeAtom, Durable, AutoDelete,
215-
Internal, XArgs, Username)
213+
case rabbit_exchange:declare(
214+
XName, XTypeAtom, Durable, AutoDelete,
215+
Internal, XArgs, Username) of
216+
{ok, DeclaredX} ->
217+
DeclaredX;
218+
{error, timeout} ->
219+
throw(
220+
<<"503">>,
221+
"Could not create ~ts because the operation "
222+
"timed out",
223+
[rabbit_misc:rs(XName)])
224+
end
216225
end,
217226
try rabbit_exchange:assert_equivalence(
218227
X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of
@@ -285,8 +294,15 @@ handle_http_req(<<"DELETE">>,
285294
ok = prohibit_default_exchange(XName),
286295
ok = prohibit_reserved_amq(XName),
287296
PermCache = check_resource_access(XName, configure, User, PermCache0),
288-
_ = rabbit_exchange:delete(XName, false, Username),
289-
{<<"204">>, null, {PermCache, TopicPermCache}};
297+
case rabbit_exchange:ensure_deleted(XName, false, Username) of
298+
ok ->
299+
{<<"204">>, null, {PermCache, TopicPermCache}};
300+
{error, timeout} ->
301+
throw(
302+
<<"503">>,
303+
"failed to delete ~ts due to a timeout",
304+
[rabbit_misc:rs(XName)])
305+
end;
290306

291307
handle_http_req(<<"POST">>,
292308
[<<"bindings">>],

deps/rabbit/src/rabbit_channel.erl

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
25122512
check_not_default_exchange(ExchangeName),
25132513
check_exchange_deletion(ExchangeName),
25142514
check_configure_permitted(ExchangeName, User, AuthzContext),
2515-
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
2516-
{error, not_found} ->
2515+
case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of
2516+
ok ->
25172517
ok;
25182518
{error, in_use} ->
25192519
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
2520-
ok ->
2521-
ok
2520+
{error, timeout} ->
2521+
rabbit_misc:protocol_error(
2522+
internal_error,
2523+
"failed to delete ~ts due to a timeout",
2524+
[rabbit_misc:rs(ExchangeName)])
25222525
end;
25232526
handle_method(#'queue.purge'{queue = QueueNameBin},
25242527
ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
@@ -2566,13 +2569,22 @@ handle_method(#'exchange.declare'{exchange = XNameBin,
25662569
check_write_permitted(AName, User, AuthzContext),
25672570
ok
25682571
end,
2569-
rabbit_exchange:declare(ExchangeName,
2570-
CheckedType,
2571-
Durable,
2572-
AutoDelete,
2573-
Internal,
2574-
Args,
2575-
Username)
2572+
case rabbit_exchange:declare(ExchangeName,
2573+
CheckedType,
2574+
Durable,
2575+
AutoDelete,
2576+
Internal,
2577+
Args,
2578+
Username) of
2579+
{ok, DeclaredX} ->
2580+
DeclaredX;
2581+
{error, timeout} ->
2582+
rabbit_misc:protocol_error(
2583+
internal_error,
2584+
"failed to declare ~ts because the operation "
2585+
"timed out",
2586+
[rabbit_misc:rs(ExchangeName)])
2587+
end
25762588
end,
25772589
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
25782590
AutoDelete, Internal, Args);

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,10 @@ count_in_khepri() ->
265265
%% update().
266266
%% -------------------------------------------------------------------
267267

268-
-spec update(ExchangeName, UpdateFun) -> ok when
268+
-spec update(ExchangeName, UpdateFun) -> Ret when
269269
ExchangeName :: rabbit_exchange:name(),
270-
UpdateFun :: fun((Exchange) -> Exchange).
270+
UpdateFun :: fun((Exchange) -> Exchange),
271+
Ret :: ok | rabbit_khepri:timeout_error().
271272
%% @doc Updates an existing exchange record using the result of
272273
%% `UpdateFun'.
273274
%%
@@ -367,7 +368,9 @@ update_in_khepri_tx(Name, Fun) ->
367368

368369
-spec create_or_get(Exchange) -> Ret when
369370
Exchange :: rabbit_types:exchange(),
370-
Ret :: {new, Exchange} | {existing, Exchange}.
371+
Ret :: {new, Exchange} |
372+
{existing, Exchange} |
373+
rabbit_khepri:timeout_error().
371374
%% @doc Writes an exchange record if it doesn't exist already or returns
372375
%% the existing one.
373376
%%
@@ -399,7 +402,9 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
399402
ok ->
400403
{new, X};
401404
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
402-
{existing, ExistingX}
405+
{existing, ExistingX};
406+
{error, timeout} = Err ->
407+
Err
403408
end.
404409

405410
%% -------------------------------------------------------------------
@@ -523,17 +528,15 @@ next_serial_in_khepri(XName) ->
523528
UpdatePath =
524529
khepri_path:combine_with_conditions(
525530
Path, [#if_payload_version{version = Vsn}]),
526-
case rabbit_khepri:put(UpdatePath, Serial + 1) of
531+
case rabbit_khepri:put(UpdatePath, Serial + 1, #{timeout => infinity}) of
527532
ok ->
528533
Serial;
529534
{error, {khepri, mismatching_node, _}} ->
530-
next_serial_in_khepri(XName);
531-
Err ->
532-
Err
535+
next_serial_in_khepri(XName)
533536
end;
534537
_ ->
535538
Serial = 1,
536-
ok = rabbit_khepri:put(Path, Serial + 1),
539+
ok = rabbit_khepri:put(Path, Serial + 1, #{timeout => infinity}),
537540
Serial
538541
end.
539542

@@ -560,7 +563,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
560563
Exchange :: rabbit_types:exchange(),
561564
Binding :: rabbit_types:binding(),
562565
Deletions :: dict:dict(),
563-
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
566+
Ret :: {deleted, Exchange, [Binding], Deletions} |
567+
{error, not_found} |
568+
{error, in_use} |
569+
rabbit_khepri:timeout_error().
564570
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
565571
%% to `true', it is only deleted when there are no bindings present on the
566572
%% exchange.

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
731731
end, [], Props),
732732
Res = rabbit_khepri:transaction(
733733
fun() ->
734-
for_each_while_ok(
734+
rabbit_misc:for_each_while_ok(
735735
fun({Path, Q}) -> khepri_tx:put(Path, Q) end,
736736
Updates)
737737
end),
@@ -749,16 +749,6 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
749749
Error
750750
end.
751751

752-
for_each_while_ok(Fun, [Elem | Rest]) ->
753-
case Fun(Elem) of
754-
ok ->
755-
for_each_while_ok(Fun, Rest);
756-
{error, _} = Error ->
757-
Error
758-
end;
759-
for_each_while_ok(_, []) ->
760-
ok.
761-
762752
%% -------------------------------------------------------------------
763753
%% exists().
764754
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -863,13 +863,18 @@ add_exchange_int(Exchange, Name, ActingUser) ->
863863
undefined -> false; %% =< 2.2.0
864864
I -> I
865865
end,
866-
rabbit_exchange:declare(Name,
867-
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
868-
maps:get(durable, Exchange, undefined),
869-
maps:get(auto_delete, Exchange, undefined),
870-
Internal,
871-
args(maps:get(arguments, Exchange, undefined)),
872-
ActingUser)
866+
case rabbit_exchange:declare(Name,
867+
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
868+
maps:get(durable, Exchange, undefined),
869+
maps:get(auto_delete, Exchange, undefined),
870+
Internal,
871+
args(maps:get(arguments, Exchange, undefined)),
872+
ActingUser) of
873+
{ok, _Exchange} ->
874+
ok;
875+
{error, timeout} = Err ->
876+
throw(Err)
877+
end
873878
end.
874879

875880
add_binding(Binding, ActingUser) ->

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
1414
update_scratch/3, update_decorators/2, immutable/1,
1515
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
16-
route/2, route/3, delete/3, validate_binding/2, count/0]).
16+
route/2, route/3, delete/3, validate_binding/2, count/0,
17+
ensure_deleted/3]).
1718
-export([list_names/0]).
1819
-export([serialise_events/1]).
1920
-export([serial/1, peek_serial/1]).
@@ -91,10 +92,16 @@ serial(X) ->
9192
true -> rabbit_db_exchange:next_serial(X#exchange.name)
9293
end.
9394

94-
-spec declare
95-
(name(), type(), boolean(), boolean(), boolean(),
96-
rabbit_framing:amqp_table(), rabbit_types:username())
97-
-> rabbit_types:exchange().
95+
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
96+
Ret when
97+
Name :: name(),
98+
Type :: type(),
99+
Durable :: boolean(),
100+
AutoDelete :: boolean(),
101+
Internal :: boolean(),
102+
Args :: rabbit_framing:amqp_table(),
103+
Username :: rabbit_types:username(),
104+
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
98105

99106
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
100107
X = rabbit_exchange_decorator:set(
@@ -121,14 +128,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
121128
Serial = serial(Exchange),
122129
ok = callback(X, create, Serial, [Exchange]),
123130
rabbit_event:notify(exchange_created, info(Exchange)),
124-
Exchange;
131+
{ok, Exchange};
125132
{existing, Exchange} ->
126-
Exchange
133+
{ok, Exchange};
134+
{error, timeout} = Err ->
135+
Err
127136
end;
128137
_ ->
129138
rabbit_log:warning("ignoring exchange.declare for exchange ~tp,
130139
exchange.delete in progress~n.", [XName]),
131-
X
140+
{ok, X}
132141
end.
133142

134143
%% Used with binaries sent over the wire; the type may not exist.
@@ -444,9 +453,13 @@ cons_if_present(XName, L) ->
444453

445454
-spec delete
446455
(name(), 'true', rabbit_types:username()) ->
447-
'ok'| rabbit_types:error('not_found' | 'in_use');
456+
'ok' |
457+
rabbit_types:error('not_found' | 'in_use') |
458+
rabbit_khepri:timeout_error();
448459
(name(), 'false', rabbit_types:username()) ->
449-
'ok' | rabbit_types:error('not_found').
460+
'ok' |
461+
rabbit_types:error('not_found') |
462+
rabbit_khepri:timeout_error().
450463

451464
delete(XName, IfUnused, Username) ->
452465
try
@@ -478,6 +491,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
478491
rabbit_binding:add_deletion(
479492
XName, {X, deleted, Bs}, Deletions)).
480493

494+
-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
495+
ExchangeName :: name(),
496+
IfUnused :: boolean(),
497+
Username :: rabbit_types:username(),
498+
Ret :: ok |
499+
rabbit_types:error('in_use') |
500+
rabbit_khepri:timeout_error().
501+
%% @doc A wrapper around `delete/3' which returns `ok' in the case that the
502+
%% exchange did not exist at time of deletion.
503+
504+
ensure_deleted(XName, IfUnused, Username) ->
505+
case delete(XName, IfUnused, Username) of
506+
ok ->
507+
ok;
508+
{error, not_found} ->
509+
ok;
510+
{error, _} = Err ->
511+
Err
512+
end.
513+
481514
-spec validate_binding
482515
(rabbit_types:exchange(), rabbit_types:binding())
483516
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).

deps/rabbit/src/rabbit_logger_exchange_h.erl

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,16 @@ wait_for_initial_pass(N) ->
148148
end.
149149

150150
setup_proc(
151-
#{config := #{exchange := #resource{name = Name,
152-
virtual_host = VHost}}} = Config) ->
151+
#{config := #{exchange := Exchange}} = Config) ->
153152
case declare_exchange(Config) of
154153
ok ->
155154
?LOG_INFO(
156-
"Logging to exchange '~ts' in vhost '~ts' ready", [Name, VHost],
155+
"Logging to ~ts ready", [rabbit_misc:rs(Exchange)],
157156
#{domain => ?RMQLOG_DOMAIN_GLOBAL});
158157
error ->
159158
?LOG_DEBUG(
160-
"Logging to exchange '~ts' in vhost '~ts' not ready, "
161-
"trying again in ~b second(s)",
162-
[Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS],
159+
"Logging to ~ts not ready, trying again in ~b second(s)",
160+
[rabbit_misc:rs(Exchange), ?DECL_EXCHANGE_INTERVAL_SECS],
163161
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
164162
receive
165163
stop -> ok
@@ -168,36 +166,45 @@ setup_proc(
168166
end
169167
end.
170168

171-
declare_exchange(
172-
#{config := #{exchange := #resource{name = Name,
173-
virtual_host = VHost} = Exchange}}) ->
174-
try
175-
%% Durable.
176-
#exchange{} = rabbit_exchange:declare(
177-
Exchange, topic, true, false, true, [],
178-
?INTERNAL_USER),
179-
?LOG_DEBUG(
180-
"Declared exchange '~ts' in vhost '~ts'",
181-
[Name, VHost],
182-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
183-
ok
169+
declare_exchange(#{config := #{exchange := Exchange}}) ->
170+
try rabbit_exchange:declare(
171+
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of
172+
{ok, #exchange{}} ->
173+
?LOG_DEBUG(
174+
"Declared ~ts",
175+
[rabbit_misc:rs(Exchange)],
176+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
177+
ok;
178+
{error, timeout} ->
179+
?LOG_DEBUG(
180+
"Could not declare ~ts because the operation timed out",
181+
[rabbit_misc:rs(Exchange)],
182+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
183+
error
184184
catch
185185
Class:Reason ->
186186
?LOG_DEBUG(
187-
"Could not declare exchange '~ts' in vhost '~ts', "
188-
"reason: ~0p:~0p",
189-
[Name, VHost, Class, Reason],
187+
"Could not declare ~ts, reason: ~0p:~0p",
188+
[rabbit_misc:rs(Exchange), Class, Reason],
190189
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
191190
error
192191
end.
193192

194193
unconfigure_exchange(
195-
#{config := #{exchange := #resource{name = Name,
196-
virtual_host = VHost} = Exchange,
194+
#{config := #{exchange := Exchange,
197195
setup_proc := Pid}}) ->
198196
Pid ! stop,
199-
_ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
197+
case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of
198+
ok ->
199+
ok;
200+
{error, timeout} ->
201+
?LOG_ERROR(
202+
"Could not delete ~ts due to a timeout",
203+
[rabbit_misc:rs(Exchange)],
204+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
205+
ok
206+
end,
200207
?LOG_INFO(
201-
"Logging to exchange '~ts' in vhost '~ts' disabled",
202-
[Name, VHost],
208+
"Logging to ~ts disabled",
209+
[rabbit_misc:rs(Exchange)],
203210
#{domain => ?RMQLOG_DOMAIN_GLOBAL}).

0 commit comments

Comments
 (0)