Skip to content

Commit 1cd66e8

Browse files
the-mikedavismergify[bot]
authored andcommitted
Handle database timeouts in rabbit_amqqueue:store_queue/1
(cherry picked from commit 8eef209)
1 parent 1d0e840 commit 1cd66e8

File tree

5 files changed

+47
-24
lines changed

5 files changed

+47
-24
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,19 @@ internal_declare(Q, Recover) ->
263263
do_internal_declare(Q, Recover).
264264

265265
do_internal_declare(Q0, true) ->
266-
%% TODO Why do we return the old state instead of the actual one?
267-
%% I'm leaving it like it was before the khepri refactor, because
268-
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
269-
%% if continue or stop. If we return the actual one, it fails and the queue stops
270-
%% silently during init.
271-
%% Maybe we should review this bit of code at some point.
272266
Q = amqqueue:set_state(Q0, live),
273-
ok = store_queue(Q),
274-
{created, Q0};
267+
case store_queue(Q) of
268+
ok ->
269+
%% TODO Why do we return the old state instead of the actual one?
270+
%% I'm leaving it like it was before the khepri refactor, because
271+
%% rabbit_amqqueue_process:init_it2 compares the result of this
272+
%% declare to decide if continue or stop. If we return the actual
273+
%% one, it fails and the queue stops silently during init.
274+
%% Maybe we should review this bit of code at some point.
275+
{created, Q0};
276+
{error, timeout} = Err ->
277+
Err
278+
end;
275279
do_internal_declare(Q0, false) ->
276280
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
277281
Queue = rabbit_queue_decorator:set(Q),
@@ -284,12 +288,18 @@ do_internal_declare(Q0, false) ->
284288
update(Name, Fun) ->
285289
rabbit_db_queue:update(Name, Fun).
286290

287-
%% only really used for quorum queues to ensure the rabbit_queue record
291+
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
292+
Queue :: amqqueue:amqqueue(),
293+
Ret :: ok | {error, timeout}.
294+
295+
%% only really used for stream queues to ensure the rabbit_queue record
288296
%% is initialised
289297
ensure_rabbit_queue_record_is_initialized(Q) ->
290298
store_queue(Q).
291299

292-
-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
300+
-spec store_queue(Queue) -> Ret when
301+
Queue :: amqqueue:amqqueue(),
302+
Ret :: ok | {error, timeout}.
293303

294304
store_queue(Q0) ->
295305
Q = rabbit_queue_decorator:set(Q0),

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ terminate(normal, State) -> %% delete case
317317
terminate(_Reason, State = #q{q = Q}) ->
318318
terminate_shutdown(fun (BQS) ->
319319
Q2 = amqqueue:set_state(Q, crashed),
320-
rabbit_amqqueue:store_queue(Q2),
320+
_ = rabbit_amqqueue:store_queue(Q2),
321321
BQS
322322
end, State).
323323

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -927,8 +927,9 @@ create_or_get_in_khepri(Q) ->
927927
%% set().
928928
%% -------------------------------------------------------------------
929929

930-
-spec set(Queue) -> ok when
931-
Queue :: amqqueue:amqqueue().
930+
-spec set(Queue) -> Ret when
931+
Queue :: amqqueue:amqqueue(),
932+
Ret :: ok | rabbit_khepri:timeout_error().
932933
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
933934
%% durable and transient. For the durable one, it resets decorators.
934935
%% The transient one is left as it is.

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
12311231
#{name := S} when S == StreamId ->
12321232
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
12331233
[?MODULE, StreamId]),
1234-
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
1234+
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
12351235
ok;
12361236
_ ->
12371237
ok

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,23 @@ create_stream(Q0) ->
177177
case rabbit_stream_coordinator:new_stream(Q, Leader) of
178178
{ok, {ok, LeaderPid}, _} ->
179179
%% update record with leader pid
180-
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
181-
rabbit_event:notify(queue_created,
182-
[{name, QName},
183-
{durable, true},
184-
{auto_delete, false},
185-
{arguments, Arguments},
186-
{type, amqqueue:get_type(Q1)},
187-
{user_who_performed_action,
188-
ActingUser}]),
189-
{new, Q};
180+
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
181+
ok ->
182+
rabbit_event:notify(queue_created,
183+
[{name, QName},
184+
{durable, true},
185+
{auto_delete, false},
186+
{arguments, Arguments},
187+
{type, amqqueue:get_type(Q1)},
188+
{user_who_performed_action,
189+
ActingUser}]),
190+
{new, Q};
191+
{error, timeout} ->
192+
{protocol_error, internal_error,
193+
"Could not set leader PID for ~ts on node '~ts' "
194+
"because the metadata store operation timed out",
195+
[rabbit_misc:rs(QName), node()]}
196+
end;
190197
Error ->
191198
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
192199
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
@@ -1296,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid,
12961303
end || {Seq, Msg} <- Msgs],
12971304
State.
12981305

1306+
-spec set_leader_pid(Pid, QName) -> Ret when
1307+
Pid :: pid(),
1308+
QName :: rabbit_amqqueue:name(),
1309+
Ret :: ok | {error, timeout}.
1310+
12991311
set_leader_pid(Pid, QName) ->
13001312
%% TODO this should probably be a single khepri transaction for better performance.
13011313
Fun = fun (Q) ->

0 commit comments

Comments
 (0)