Skip to content

Commit 4d003f9

Browse files
Merge pull request #12019 from rabbitmq/mergify/bp/v4.0.x/pr-11980
2 parents 5264c62 + 1cd66e8 commit 4d003f9

File tree

7 files changed

+89
-38
lines changed

7 files changed

+89
-38
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -252,22 +252,30 @@ get_queue_type(Args, DefaultQueueType) ->
252252
rabbit_queue_type:discover(V)
253253
end.
254254

255-
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
256-
{created | existing, amqqueue:amqqueue()} | queue_absent().
255+
-spec internal_declare(Queue, Recover) -> Ret when
256+
Queue :: amqqueue:amqqueue(),
257+
Recover :: boolean(),
258+
Ret :: {created | existing, amqqueue:amqqueue()} |
259+
queue_absent() |
260+
rabbit_khepri:timeout_error().
257261

258262
internal_declare(Q, Recover) ->
259263
do_internal_declare(Q, Recover).
260264

261265
do_internal_declare(Q0, true) ->
262-
%% TODO Why do we return the old state instead of the actual one?
263-
%% I'm leaving it like it was before the khepri refactor, because
264-
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
265-
%% if continue or stop. If we return the actual one, it fails and the queue stops
266-
%% silently during init.
267-
%% Maybe we should review this bit of code at some point.
268266
Q = amqqueue:set_state(Q0, live),
269-
ok = store_queue(Q),
270-
{created, Q0};
267+
case store_queue(Q) of
268+
ok ->
269+
%% TODO Why do we return the old state instead of the actual one?
270+
%% I'm leaving it like it was before the khepri refactor, because
271+
%% rabbit_amqqueue_process:init_it2 compares the result of this
272+
%% declare to decide if continue or stop. If we return the actual
273+
%% one, it fails and the queue stops silently during init.
274+
%% Maybe we should review this bit of code at some point.
275+
{created, Q0};
276+
{error, timeout} = Err ->
277+
Err
278+
end;
271279
do_internal_declare(Q0, false) ->
272280
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
273281
Queue = rabbit_queue_decorator:set(Q),
@@ -280,12 +288,18 @@ do_internal_declare(Q0, false) ->
280288
update(Name, Fun) ->
281289
rabbit_db_queue:update(Name, Fun).
282290

283-
%% only really used for quorum queues to ensure the rabbit_queue record
291+
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
292+
Queue :: amqqueue:amqqueue(),
293+
Ret :: ok | {error, timeout}.
294+
295+
%% only really used for stream queues to ensure the rabbit_queue record
284296
%% is initialised
285297
ensure_rabbit_queue_record_is_initialized(Q) ->
286298
store_queue(Q).
287299

288-
-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
300+
-spec store_queue(Queue) -> Ret when
301+
Queue :: amqqueue:amqqueue(),
302+
Ret :: ok | {error, timeout}.
289303

290304
store_queue(Q0) ->
291305
Q = rabbit_queue_decorator:set(Q0),
@@ -325,12 +339,10 @@ is_server_named_allowed(Args) ->
325339
Type = get_queue_type(Args),
326340
rabbit_queue_type:is_server_named_allowed(Type).
327341

328-
-spec lookup
329-
(name()) ->
330-
rabbit_types:ok(amqqueue:amqqueue()) |
331-
rabbit_types:error('not_found');
332-
([name()]) ->
333-
[amqqueue:amqqueue()].
342+
-spec lookup(QueueName) -> Ret when
343+
QueueName :: name(),
344+
Ret :: rabbit_types:ok(amqqueue:amqqueue())
345+
| rabbit_types:error('not_found').
334346

335347
lookup(Name) when is_record(Name, resource) ->
336348
rabbit_db_queue:get(Name).

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@ init_it2(Recover, From, State = #q{q = Q,
226226
false ->
227227
{stop, normal, {existing, Q1}, State}
228228
end;
229+
{error, timeout} ->
230+
Reason = {protocol_error, internal_error,
231+
"Could not declare ~ts on node '~ts' because the "
232+
"metadata store operation timed out",
233+
[rabbit_misc:rs(amqqueue:get_name(Q)), node()]},
234+
{stop, normal, Reason, State};
229235
Err ->
230236
{stop, normal, Err, State}
231237
end.
@@ -311,7 +317,7 @@ terminate(normal, State) -> %% delete case
311317
terminate(_Reason, State = #q{q = Q}) ->
312318
terminate_shutdown(fun (BQS) ->
313319
Q2 = amqqueue:set_state(Q, crashed),
314-
rabbit_amqqueue:store_queue(Q2),
320+
_ = rabbit_amqqueue:store_queue(Q2),
315321
BQS
316322
end, State).
317323

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) ->
875875

876876
-spec create_or_get(Queue) -> Ret when
877877
Queue :: amqqueue:amqqueue(),
878-
Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
878+
Ret :: {created, Queue} |
879+
{existing, Queue} |
880+
{absent, Queue, nodedown} |
881+
rabbit_khepri:timeout_error().
879882
%% @doc Writes a queue record if it doesn't exist already or returns the existing one
880883
%%
881884
%% @returns the existing record if there is one in the database already, or the newly
@@ -924,8 +927,9 @@ create_or_get_in_khepri(Q) ->
924927
%% set().
925928
%% -------------------------------------------------------------------
926929

927-
-spec set(Queue) -> ok when
928-
Queue :: amqqueue:amqqueue().
930+
-spec set(Queue) -> Ret when
931+
Queue :: amqqueue:amqqueue(),
932+
Ret :: ok | rabbit_khepri:timeout_error().
929933
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
930934
%% durable and transient. For the durable one, it resets decorators.
931935
%% The transient one is left as it is.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,12 @@ start_cluster(Q) ->
295295
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
296296
end;
297297
{existing, _} = Ex ->
298-
Ex
298+
Ex;
299+
{error, timeout} ->
300+
{protocol_error, internal_error,
301+
"Could not declare quorum ~ts on node '~ts' because the metadata "
302+
"store operation timed out",
303+
[rabbit_misc:rs(QName), node()]}
299304
end.
300305

301306
declare_queue_error(Error, Queue, Leader, ActingUser) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12311231
#{name := S} when S == StreamId ->
12321232
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
12331233
[?MODULE, StreamId]),
1234-
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
1234+
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
12351235
ok;
12361236
_ ->
12371237
ok

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -177,25 +177,37 @@ create_stream(Q0) ->
177177
case rabbit_stream_coordinator:new_stream(Q, Leader) of
178178
{ok, {ok, LeaderPid}, _} ->
179179
%% update record with leader pid
180-
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
181-
rabbit_event:notify(queue_created,
182-
[{name, QName},
183-
{durable, true},
184-
{auto_delete, false},
185-
{arguments, Arguments},
186-
{type, amqqueue:get_type(Q1)},
187-
{user_who_performed_action,
188-
ActingUser}]),
189-
{new, Q};
180+
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
181+
ok ->
182+
rabbit_event:notify(queue_created,
183+
[{name, QName},
184+
{durable, true},
185+
{auto_delete, false},
186+
{arguments, Arguments},
187+
{type, amqqueue:get_type(Q1)},
188+
{user_who_performed_action,
189+
ActingUser}]),
190+
{new, Q};
191+
{error, timeout} ->
192+
{protocol_error, internal_error,
193+
"Could not set leader PID for ~ts on node '~ts' "
194+
"because the metadata store operation timed out",
195+
[rabbit_misc:rs(QName), node()]}
196+
end;
190197
Error ->
191198
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
192-
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
199+
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
193200
[rabbit_misc:rs(QName), node(), Error]}
194201
end;
195202
{existing, Q} ->
196203
{existing, Q};
197204
{absent, Q, Reason} ->
198-
{absent, Q, Reason}
205+
{absent, Q, Reason};
206+
{error, timeout} ->
207+
{protocol_error, internal_error,
208+
"Could not declare ~ts on node '~ts' because the metadata store "
209+
"operation timed out",
210+
[rabbit_misc:rs(QName), node()]}
199211
end.
200212

201213
-spec delete(amqqueue:amqqueue(), boolean(),
@@ -1291,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid,
12911303
end || {Seq, Msg} <- Msgs],
12921304
State.
12931305

1306+
-spec set_leader_pid(Pid, QName) -> Ret when
1307+
Pid :: pid(),
1308+
QName :: rabbit_amqqueue:name(),
1309+
Ret :: ok | {error, timeout}.
1310+
12941311
set_leader_pid(Pid, QName) ->
12951312
%% TODO this should probably be a single khepri transaction for better performance.
12961313
Fun = fun (Q) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ is_stateful() ->
7070

7171
-spec declare(amqqueue:amqqueue(), node()) ->
7272
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
73-
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
73+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
74+
{protocol_error, internal_error, string(), [string()]}.
7475
declare(Q0, _Node) ->
76+
QName = amqqueue:get_name(Q0),
7577
Q1 = case amqqueue:get_pid(Q0) of
7678
none ->
7779
%% declaring process becomes the queue
@@ -86,14 +88,19 @@ declare(Q0, _Node) ->
8688
Opts = amqqueue:get_options(Q),
8789
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
8890
rabbit_event:notify(queue_created,
89-
[{name, amqqueue:get_name(Q)},
91+
[{name, QName},
9092
{durable, true},
9193
{auto_delete, false},
9294
{exclusive, true},
9395
{type, amqqueue:get_type(Q)},
9496
{arguments, amqqueue:get_arguments(Q)},
9597
{user_who_performed_action, ActingUser}]),
9698
{new, Q};
99+
{error, timeout} ->
100+
{protocol_error, internal_error,
101+
"Could not declare ~ts because the metadata store operation "
102+
"timed out",
103+
[rabbit_misc:rs(QName)]};
97104
Other ->
98105
Other
99106
end.

0 commit comments

Comments
 (0)