Skip to content

Commit 8eef209

Browse files
committed
Handle database timeouts in rabbit_amqqueue:store_queue/1
1 parent 8889d40 commit 8eef209

File tree

5 files changed

+47
-24
lines changed

5 files changed

+47
-24
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,19 @@ internal_declare(Q, Recover) ->
262262
do_internal_declare(Q, Recover).
263263

264264
do_internal_declare(Q0, true) ->
265-
%% TODO Why do we return the old state instead of the actual one?
266-
%% I'm leaving it like it was before the khepri refactor, because
267-
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
268-
%% if continue or stop. If we return the actual one, it fails and the queue stops
269-
%% silently during init.
270-
%% Maybe we should review this bit of code at some point.
271265
Q = amqqueue:set_state(Q0, live),
272-
ok = store_queue(Q),
273-
{created, Q0};
266+
case store_queue(Q) of
267+
ok ->
268+
%% TODO Why do we return the old state instead of the actual one?
269+
%% I'm leaving it like it was before the khepri refactor, because
270+
%% rabbit_amqqueue_process:init_it2 compares the result of this
271+
%% declare to decide if continue or stop. If we return the actual
272+
%% one, it fails and the queue stops silently during init.
273+
%% Maybe we should review this bit of code at some point.
274+
{created, Q0};
275+
{error, timeout} = Err ->
276+
Err
277+
end;
274278
do_internal_declare(Q0, false) ->
275279
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
276280
Queue = rabbit_queue_decorator:set(Q),
@@ -283,12 +287,18 @@ do_internal_declare(Q0, false) ->
283287
update(Name, Fun) ->
284288
rabbit_db_queue:update(Name, Fun).
285289

286-
%% only really used for quorum queues to ensure the rabbit_queue record
290+
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
291+
Queue :: amqqueue:amqqueue(),
292+
Ret :: ok | {error, timeout}.
293+
294+
%% only really used for stream queues to ensure the rabbit_queue record
287295
%% is initialised
288296
ensure_rabbit_queue_record_is_initialized(Q) ->
289297
store_queue(Q).
290298

291-
-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
299+
-spec store_queue(Queue) -> Ret when
300+
Queue :: amqqueue:amqqueue(),
301+
Ret :: ok | {error, timeout}.
292302

293303
store_queue(Q0) ->
294304
Q = rabbit_queue_decorator:set(Q0),

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ terminate(normal, State) -> %% delete case
317317
terminate(_Reason, State = #q{q = Q}) ->
318318
terminate_shutdown(fun (BQS) ->
319319
Q2 = amqqueue:set_state(Q, crashed),
320-
rabbit_amqqueue:store_queue(Q2),
320+
_ = rabbit_amqqueue:store_queue(Q2),
321321
BQS
322322
end, State).
323323

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -927,8 +927,9 @@ create_or_get_in_khepri(Q) ->
927927
%% set().
928928
%% -------------------------------------------------------------------
929929

930-
-spec set(Queue) -> ok when
931-
Queue :: amqqueue:amqqueue().
930+
-spec set(Queue) -> Ret when
931+
Queue :: amqqueue:amqqueue(),
932+
Ret :: ok | rabbit_khepri:timeout_error().
932933
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
933934
%% durable and transient. For the durable one, it resets decorators.
934935
%% The transient one is left as it is.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12311231
#{name := S} when S == StreamId ->
12321232
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
12331233
[?MODULE, StreamId]),
1234-
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
1234+
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
12351235
ok;
12361236
_ ->
12371237
ok

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,23 @@ create_stream(Q0) ->
177177
case rabbit_stream_coordinator:new_stream(Q, Leader) of
178178
{ok, {ok, LeaderPid}, _} ->
179179
%% update record with leader pid
180-
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
181-
rabbit_event:notify(queue_created,
182-
[{name, QName},
183-
{durable, true},
184-
{auto_delete, false},
185-
{arguments, Arguments},
186-
{type, amqqueue:get_type(Q1)},
187-
{user_who_performed_action,
188-
ActingUser}]),
189-
{new, Q};
180+
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
181+
ok ->
182+
rabbit_event:notify(queue_created,
183+
[{name, QName},
184+
{durable, true},
185+
{auto_delete, false},
186+
{arguments, Arguments},
187+
{type, amqqueue:get_type(Q1)},
188+
{user_who_performed_action,
189+
ActingUser}]),
190+
{new, Q};
191+
{error, timeout} ->
192+
{protocol_error, internal_error,
193+
"Could not set leader PID for ~ts on node '~ts' "
194+
"because the metadata store operation timed out",
195+
[rabbit_misc:rs(QName), node()]}
196+
end;
190197
Error ->
191198
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
192199
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
@@ -1296,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid,
12961303
end || {Seq, Msg} <- Msgs],
12971304
State.
12981305

1306+
-spec set_leader_pid(Pid, QName) -> Ret when
1307+
Pid :: pid(),
1308+
QName :: rabbit_amqqueue:name(),
1309+
Ret :: ok | {error, timeout}.
1310+
12991311
set_leader_pid(Pid, QName) ->
13001312
%% TODO this should probably be a single khepri transaction for better performance.
13011313
Fun = fun (Q) ->

0 commit comments

Comments
 (0)