Skip to content

Commit 1d0e840

Browse files
the-mikedavismergify[bot]
authored andcommitted
Handle database timeouts when declaring queues
This fixes a case-clause crash in the logs in `cluster_minority_SUITE`. When the database is not available `rabbit_amqqueue:declare/6,7` should return a `protocol_error` record with an error message rather than a hard crash. Also included in this change is the necessary changes to typespecs: `rabbit_db_queue:create_or_get/1` is the first function to return a possible `{error,timeout}`. That bubbles up through `rabbit_amqqueue:internal_declare/3` and must be handled in each `rabbit_queue_type:declare/2` callback. (cherry picked from commit 8889d40)
1 parent 246e8fa commit 1d0e840

File tree

6 files changed

+37
-7
lines changed

6 files changed

+37
-7
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,12 @@ get_queue_type(Args, DefaultQueueType) ->
252252
rabbit_queue_type:discover(V)
253253
end.
254254

255-
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
256-
{created | existing, amqqueue:amqqueue()} | queue_absent().
255+
-spec internal_declare(Queue, Recover) -> Ret when
256+
Queue :: amqqueue:amqqueue(),
257+
Recover :: boolean(),
258+
Ret :: {created | existing, amqqueue:amqqueue()} |
259+
queue_absent() |
260+
rabbit_khepri:timeout_error().
257261

258262
internal_declare(Q, Recover) ->
259263
do_internal_declare(Q, Recover).

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@ init_it2(Recover, From, State = #q{q = Q,
226226
false ->
227227
{stop, normal, {existing, Q1}, State}
228228
end;
229+
{error, timeout} ->
230+
Reason = {protocol_error, internal_error,
231+
"Could not declare ~ts on node '~ts' because the "
232+
"metadata store operation timed out",
233+
[rabbit_misc:rs(amqqueue:get_name(Q)), node()]},
234+
{stop, normal, Reason, State};
229235
Err ->
230236
{stop, normal, Err, State}
231237
end.

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) ->
875875

876876
-spec create_or_get(Queue) -> Ret when
877877
Queue :: amqqueue:amqqueue(),
878-
Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
878+
Ret :: {created, Queue} |
879+
{existing, Queue} |
880+
{absent, Queue, nodedown} |
881+
rabbit_khepri:timeout_error().
879882
%% @doc Writes a queue record if it doesn't exist already or returns the existing one
880883
%%
881884
%% @returns the existing record if there is one in the database already, or the newly

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,12 @@ start_cluster(Q) ->
295295
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
296296
end;
297297
{existing, _} = Ex ->
298-
Ex
298+
Ex;
299+
{error, timeout} ->
300+
{protocol_error, internal_error,
301+
"Could not declare quorum ~ts on node '~ts' because the metadata "
302+
"store operation timed out",
303+
[rabbit_misc:rs(QName), node()]}
299304
end.
300305

301306
declare_queue_error(Error, Queue, Leader, ActingUser) ->

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,12 @@ create_stream(Q0) ->
195195
{existing, Q} ->
196196
{existing, Q};
197197
{absent, Q, Reason} ->
198-
{absent, Q, Reason}
198+
{absent, Q, Reason};
199+
{error, timeout} ->
200+
{protocol_error, internal_error,
201+
"Could not declare ~ts on node '~ts' because the metadata store "
202+
"operation timed out",
203+
[rabbit_misc:rs(QName), node()]}
199204
end.
200205

201206
-spec delete(amqqueue:amqqueue(), boolean(),

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ is_stateful() ->
7070

7171
-spec declare(amqqueue:amqqueue(), node()) ->
7272
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
73-
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
73+
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
74+
{protocol_error, internal_error, string(), [string()]}.
7475
declare(Q0, _Node) ->
76+
QName = amqqueue:get_name(Q0),
7577
Q1 = case amqqueue:get_pid(Q0) of
7678
none ->
7779
%% declaring process becomes the queue
@@ -86,14 +88,19 @@ declare(Q0, _Node) ->
8688
Opts = amqqueue:get_options(Q),
8789
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
8890
rabbit_event:notify(queue_created,
89-
[{name, amqqueue:get_name(Q)},
91+
[{name, QName},
9092
{durable, true},
9193
{auto_delete, false},
9294
{exclusive, true},
9395
{type, amqqueue:get_type(Q)},
9496
{arguments, amqqueue:get_arguments(Q)},
9597
{user_who_performed_action, ActingUser}]),
9698
{new, Q};
99+
{error, timeout} ->
100+
{protocol_error, internal_error,
101+
"Could not declare ~ts because the metadata store operation "
102+
"timed out",
103+
[rabbit_misc:rs(QName)]};
97104
Other ->
98105
Other
99106
end.

0 commit comments

Comments
 (0)