Skip to content

Commit 52a0d70

Browse files
committed
Handle database timeouts when declaring exchanges
The spec of `rabbit_exchange:declare/7` needs to be updated to return `{ok, Exchange} | {error, Reason}` instead of the old return value of `rabbit_types:exchange()`. This is safe to do since `declare/7` is not called by RPC - from the CLI or otherwise - outside of test suites, and in test suites only through the CLI's `TestHelper.declare_exchange/7`. Callers of this helper are updated in this commit. Otherwise this commit updates callers to unwrap the `{ok, Exchange}` and bubble up errors.
1 parent 96c60a2 commit 52a0d70

File tree

12 files changed

+180
-105
lines changed

12 files changed

+180
-105
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,18 @@ handle_http_req(<<"PUT">>,
210210
{error, not_found} ->
211211
ok = prohibit_cr_lf(XNameBin),
212212
ok = prohibit_reserved_amq(XName),
213-
rabbit_exchange:declare(
214-
XName, XTypeAtom, Durable, AutoDelete,
215-
Internal, XArgs, Username)
213+
case rabbit_exchange:declare(
214+
XName, XTypeAtom, Durable, AutoDelete,
215+
Internal, XArgs, Username) of
216+
{ok, DeclaredX} ->
217+
DeclaredX;
218+
{error, timeout} ->
219+
throw(
220+
<<"500">>,
221+
"Could not create exchange '~ts' in vhost '~ts' "
222+
"because the operation timed out",
223+
[XName, Vhost])
224+
end
216225
end,
217226
try rabbit_exchange:assert_equivalence(
218227
X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of

deps/rabbit/src/rabbit_channel.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2569,13 +2569,22 @@ handle_method(#'exchange.declare'{exchange = XNameBin,
25692569
check_write_permitted(AName, User, AuthzContext),
25702570
ok
25712571
end,
2572-
rabbit_exchange:declare(ExchangeName,
2573-
CheckedType,
2574-
Durable,
2575-
AutoDelete,
2576-
Internal,
2577-
Args,
2578-
Username)
2572+
case rabbit_exchange:declare(ExchangeName,
2573+
CheckedType,
2574+
Durable,
2575+
AutoDelete,
2576+
Internal,
2577+
Args,
2578+
Username) of
2579+
{ok, DeclaredX} ->
2580+
DeclaredX;
2581+
{error, timeout} ->
2582+
rabbit_misc:protocol_error(
2583+
internal_error,
2584+
"failed to declare exchange '~ts' in vhost '~ts' "
2585+
"because the operation timed out",
2586+
[XNameBinStripped, VHostPath])
2587+
end
25792588
end,
25802589
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
25812590
AutoDelete, Internal, Args);

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,9 @@ update_in_khepri_tx(Name, Fun) ->
368368

369369
-spec create_or_get(Exchange) -> Ret when
370370
Exchange :: rabbit_types:exchange(),
371-
Ret :: {new, Exchange} | {existing, Exchange}.
371+
Ret :: {new, Exchange} |
372+
{existing, Exchange} |
373+
rabbit_khepri:timeout_error().
372374
%% @doc Writes an exchange record if it doesn't exist already or returns
373375
%% the existing one.
374376
%%
@@ -400,7 +402,9 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
400402
ok ->
401403
{new, X};
402404
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
403-
{existing, ExistingX}
405+
{existing, ExistingX};
406+
{error, timeout} = Err ->
407+
Err
404408
end.
405409

406410
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -863,13 +863,18 @@ add_exchange_int(Exchange, Name, ActingUser) ->
863863
undefined -> false; %% =< 2.2.0
864864
I -> I
865865
end,
866-
rabbit_exchange:declare(Name,
867-
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
868-
maps:get(durable, Exchange, undefined),
869-
maps:get(auto_delete, Exchange, undefined),
870-
Internal,
871-
args(maps:get(arguments, Exchange, undefined)),
872-
ActingUser)
866+
case rabbit_exchange:declare(Name,
867+
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
868+
maps:get(durable, Exchange, undefined),
869+
maps:get(auto_delete, Exchange, undefined),
870+
Internal,
871+
args(maps:get(arguments, Exchange, undefined)),
872+
ActingUser) of
873+
{ok, _Exchange} ->
874+
ok;
875+
{error, timeout} = Err ->
876+
throw(Err)
877+
end
873878
end.
874879

875880
add_binding(Binding, ActingUser) ->

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,16 @@ serial(X) ->
9292
true -> rabbit_db_exchange:next_serial(X#exchange.name)
9393
end.
9494

95-
-spec declare
96-
(name(), type(), boolean(), boolean(), boolean(),
97-
rabbit_framing:amqp_table(), rabbit_types:username())
98-
-> rabbit_types:exchange().
95+
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
96+
Ret when
97+
Name :: name(),
98+
Type :: type(),
99+
Durable :: boolean(),
100+
AutoDelete :: boolean(),
101+
Internal :: boolean(),
102+
Args :: rabbit_framing:amqp_table(),
103+
Username :: rabbit_types:username(),
104+
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
99105

100106
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
101107
X = rabbit_exchange_decorator:set(
@@ -122,14 +128,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
122128
Serial = serial(Exchange),
123129
ok = callback(X, create, Serial, [Exchange]),
124130
rabbit_event:notify(exchange_created, info(Exchange)),
125-
Exchange;
131+
{ok, Exchange};
126132
{existing, Exchange} ->
127-
Exchange
133+
{ok, Exchange};
134+
{error, timeout} = Err ->
135+
Err
128136
end;
129137
_ ->
130138
rabbit_log:warning("ignoring exchange.declare for exchange ~tp,
131139
exchange.delete in progress~n.", [XName]),
132-
X
140+
{ok, X}
133141
end.
134142

135143
%% Used with binaries sent over the wire; the type may not exist.

deps/rabbit/src/rabbit_logger_exchange_h.erl

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,21 @@ setup_proc(
171171
declare_exchange(
172172
#{config := #{exchange := #resource{name = Name,
173173
virtual_host = VHost} = Exchange}}) ->
174-
try
175-
%% Durable.
176-
#exchange{} = rabbit_exchange:declare(
177-
Exchange, topic, true, false, true, [],
178-
?INTERNAL_USER),
179-
?LOG_DEBUG(
180-
"Declared exchange '~ts' in vhost '~ts'",
181-
[Name, VHost],
182-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
183-
ok
174+
try rabbit_exchange:declare(
175+
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of
176+
{ok, #exchange{}} ->
177+
?LOG_DEBUG(
178+
"Declared exchange '~ts' in vhost '~ts'",
179+
[Name, VHost],
180+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
181+
ok;
182+
{error, timeout} ->
183+
?LOG_DEBUG(
184+
"Could not declare exchange '~ts' in vhost '~ts' because the "
185+
"operation timed out",
186+
[Name, VHost],
187+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
188+
error
184189
catch
185190
Class:Reason ->
186191
?LOG_DEBUG(

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -201,33 +201,57 @@ do_add(Name, Metadata, ActingUser) ->
201201
ok
202202
end,
203203
rabbit_db_vhost_defaults:apply(Name, ActingUser),
204-
_ = [begin
205-
Resource = rabbit_misc:r(Name, exchange, ExchangeName),
206-
rabbit_log:debug("Will declare an exchange ~tp", [Resource]),
207-
_ = rabbit_exchange:declare(Resource, Type, true, false, Internal, [], ActingUser)
208-
end || {ExchangeName, Type, Internal} <-
209-
[{<<"">>, direct, false},
210-
{<<"amq.direct">>, direct, false},
211-
{<<"amq.topic">>, topic, false},
212-
%% per 0-9-1 pdf
213-
{<<"amq.match">>, headers, false},
214-
%% per 0-9-1 xml
215-
{<<"amq.headers">>, headers, false},
216-
{<<"amq.fanout">>, fanout, false},
217-
{<<"amq.rabbitmq.trace">>, topic, true}]],
218-
case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of
204+
case declare_default_exchanges(Name, ActingUser) of
219205
ok ->
220-
rabbit_event:notify(vhost_created, info(VHost)
221-
++ [{user_who_performed_action, ActingUser},
222-
{description, Description},
223-
{tags, Tags}]),
224-
ok;
225-
{error, Reason} ->
226-
Msg = rabbit_misc:format("failed to set up vhost '~ts': ~tp",
227-
[Name, Reason]),
206+
case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of
207+
ok ->
208+
rabbit_event:notify(vhost_created, info(VHost)
209+
++ [{user_who_performed_action, ActingUser},
210+
{description, Description},
211+
{tags, Tags}]),
212+
ok;
213+
{error, Reason} ->
214+
Msg = rabbit_misc:format("failed to set up vhost '~ts': ~tp",
215+
[Name, Reason]),
216+
{error, Msg}
217+
end;
218+
{error, timeout} ->
219+
Msg = rabbit_misc:format(
220+
"failed to set up vhost '~ts' because a timeout occurred "
221+
"while adding default exchanges",
222+
[Name]),
228223
{error, Msg}
229224
end.
230225

226+
-spec declare_default_exchanges(VHostName, ActingUser) -> Ret when
227+
VHostName :: vhost:name(),
228+
ActingUser :: rabbit_types:username(),
229+
Ret :: ok | {error, timeout}.
230+
231+
declare_default_exchanges(VHostName, ActingUser) ->
232+
DefaultExchanges = [{<<"">>, direct, false},
233+
{<<"amq.direct">>, direct, false},
234+
{<<"amq.topic">>, topic, false},
235+
%% per 0-9-1 pdf
236+
{<<"amq.match">>, headers, false},
237+
%% per 0-9-1 xml
238+
{<<"amq.headers">>, headers, false},
239+
{<<"amq.fanout">>, fanout, false},
240+
{<<"amq.rabbitmq.trace">>, topic, true}],
241+
rabbit_misc:for_each_while_ok(
242+
fun({ExchangeName, Type, Internal}) ->
243+
Resource = rabbit_misc:r(VHostName, exchange, ExchangeName),
244+
rabbit_log:debug("Will declare an exchange ~tp", [Resource]),
245+
case rabbit_exchange:declare(
246+
Resource, Type, true, false, Internal, [],
247+
ActingUser) of
248+
{ok, _} ->
249+
ok;
250+
{error, timeout} = Err ->
251+
Err
252+
end
253+
end, DefaultExchanges).
254+
231255
-spec update_metadata(vhost:name(), vhost:metadata(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
232256
update_metadata(Name, Metadata0, ActingUser) ->
233257
Metadata = maps:with([description, tags, default_queue_type], Metadata0),

deps/rabbit/test/routing_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ topic(Config) ->
8484

8585
topic1(_Config) ->
8686
XName = rabbit_misc:r(?VHOST, exchange, <<"topic_matching-exchange">>),
87-
X = rabbit_exchange:declare(
88-
XName, topic, _Durable = true, _AutoDelete = false,
89-
_Internal = false, _Args = [], ?USER),
87+
{ok, X} = rabbit_exchange:declare(
88+
XName, topic, _Durable = true, _AutoDelete = false,
89+
_Internal = false, _Args = [], ?USER),
9090

9191
%% add some bindings
9292
Bindings = [#binding{source = XName,

deps/rabbitmq_cli/test/ctl/list_exchanges_command_test.exs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ defmodule ListExchangesCommandTest do
9696

9797
test "run: default options test", context do
9898
exchange_name = "test_exchange"
99-
declare_exchange(exchange_name, @vhost)
99+
{:ok, _} = declare_exchange(exchange_name, @vhost)
100100

101101
assert MapSet.new(run_command_to_list(@command, [["name", "type"], context[:opts]])) ==
102102
MapSet.new(
@@ -106,8 +106,8 @@ defmodule ListExchangesCommandTest do
106106
end
107107

108108
test "run: list multiple exchanges", context do
109-
declare_exchange("test_exchange_1", @vhost, :direct)
110-
declare_exchange("test_exchange_2", @vhost, :fanout)
109+
{:ok, _} = declare_exchange("test_exchange_1", @vhost, :direct)
110+
{:ok, _} = declare_exchange("test_exchange_2", @vhost, :fanout)
111111

112112
non_default_exchanges =
113113
run_command_to_list(@command, [["name", "type"], context[:opts]])
@@ -124,8 +124,8 @@ defmodule ListExchangesCommandTest do
124124
end
125125

126126
test "run: info keys filter single key", context do
127-
declare_exchange("test_exchange_1", @vhost)
128-
declare_exchange("test_exchange_2", @vhost)
127+
{:ok, _} = declare_exchange("test_exchange_1", @vhost)
128+
{:ok, _} = declare_exchange("test_exchange_2", @vhost)
129129

130130
non_default_exchanges =
131131
run_command_to_list(@command, [["name"], context[:opts]])
@@ -138,8 +138,8 @@ defmodule ListExchangesCommandTest do
138138
end
139139

140140
test "run: info keys add additional keys", context do
141-
declare_exchange("durable_exchange", @vhost, :direct, true)
142-
declare_exchange("auto_delete_exchange", @vhost, :fanout, false, true)
141+
{:ok, _} = declare_exchange("durable_exchange", @vhost, :direct, true)
142+
{:ok, _} = declare_exchange("auto_delete_exchange", @vhost, :fanout, false, true)
143143

144144
non_default_exchanges =
145145
run_command_to_list(@command, [["name", "type", "durable", "auto_delete"], context[:opts]])
@@ -162,8 +162,8 @@ defmodule ListExchangesCommandTest do
162162
delete_vhost(other_vhost)
163163
end)
164164

165-
declare_exchange("test_exchange_1", @vhost)
166-
declare_exchange("test_exchange_2", other_vhost)
165+
{:ok, _} = declare_exchange("test_exchange_1", @vhost)
166+
{:ok, _} = declare_exchange("test_exchange_2", other_vhost)
167167

168168
non_default_exchanges1 =
169169
run_command_to_list(@command, [["name"], context[:opts]])

deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ info(_X) -> [].
3838
info(_X, _) -> [].
3939

4040
register() ->
41-
_ = rabbit_exchange:declare(exchange(), topic, true, false, true, [],
42-
?INTERNAL_USER),
43-
gen_event:add_handler(rabbit_event, ?MODULE, []).
41+
case rabbit_exchange:declare(exchange(), topic, true, false, true, [],
42+
?INTERNAL_USER) of
43+
{ok, _Exchange} ->
44+
gen_event:add_handler(rabbit_event, ?MODULE, []);
45+
{error, timeout} = Err ->
46+
Err
47+
end.
4448

4549
unregister() ->
4650
case rabbit_exchange:ensure_deleted(exchange(), false, ?INTERNAL_USER) of

deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,12 @@ upstream_validation(_Config) ->
200200
ok.
201201

202202
with_exchanges(Fun) ->
203-
rabbit_exchange:declare(r(?US_NAME), fanout, false, false, false, [],
204-
<<"acting-user">>),
205-
X = rabbit_exchange:declare(r(?DS_NAME), fanout, false, false, false, [],
206-
<<"acting-user">>),
203+
{ok, _} = rabbit_exchange:declare(
204+
r(?US_NAME), fanout, false, false, false, [],
205+
<<"acting-user">>),
206+
{ok, X} = rabbit_exchange:declare(
207+
r(?DS_NAME), fanout, false, false, false, [],
208+
<<"acting-user">>),
207209
Fun(X),
208210
%% Delete downstream first or it will recreate the upstream
209211
rabbit_exchange:delete(r(?DS_NAME), false, <<"acting-user">>),

0 commit comments

Comments
 (0)