Skip to content

Commit 76d66a1

Browse files
committed
Ensure calling ParentPID in leader health check execution and
reuse and extend formatting API, with amqqueue:to_printable/2
1 parent 6cc03b0 commit 76d66a1

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
set_immutable/1,
7171
qnode/1,
7272
to_printable/1,
73+
to_printable/2,
7374
macros/0]).
7475

7576
-define(record_version, amqqueue_v2).
@@ -564,6 +565,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name},
564565
<<"virtual_host">> => VHost,
565566
<<"type">> => Type}.
566567

568+
-spec to_printable(rabbit_types:r(), atom() | binary()) -> #{binary() => any()}.
569+
to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
570+
_ = rabbit_queue_type:discover(Type),
571+
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
572+
<<"name">> => Name,
573+
<<"virtual_host">> => VHost,
574+
<<"type">> => Type}.
575+
567576
% private
568577

569578
macros() ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,6 +2154,7 @@ leader_health_check(QueueNameOrRegEx, VHost) ->
21542154
%% we cannot spawn any new processes for executing QQ leader health checks.
21552155
ProcessLimitThreshold = round(0.4 * erlang:system_info(process_limit)),
21562156

2157+
ParentPID = self(),
21572158
HealthCheckRef = make_ref(),
21582159
HealthCheckPids =
21592160
lists:flatten(
@@ -2164,12 +2165,13 @@ leader_health_check(QueueNameOrRegEx, VHost) ->
21642165
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
21652166
match ->
21662167
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
2167-
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, self()) end);
2168+
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end);
21682169
_ ->
21692170
[]
21702171
end;
21712172
false ->
2172-
[]
2173+
rabbit_log:warning("Leader health check failed from exceeded process limit threshold"),
2174+
throw({error, leader_health_check_process_limit_exceeded})
21732175
end
21742176
end || Q <- rabbit_amqqueue:list(VHost), amqqueue:get_type(Q) == ?MODULE]),
21752177
wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []).
@@ -2189,23 +2191,15 @@ wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
21892191
{ok, Ref, _QResource} when N == 1 ->
21902192
UnhealthyAcc;
21912193
{error, Ref, QResource} when N == 1 ->
2192-
[cli_format(QResource) | UnhealthyAcc];
2194+
[amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc];
21932195
{ok, Ref, _QResource} ->
21942196
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
21952197
{error, Ref, QResource} ->
2196-
wait_for_leader_health_checks(Ref, N - 1, [cli_format(QResource) | UnhealthyAcc])
2198+
wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc])
21972199
after
21982200
?QQ_GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
21992201
UnhealthyAcc
22002202
end.
22012203

22022204
check_process_limit_safety(ProcessLimitThreshold) ->
22032205
erlang:system_info(process_count) < ProcessLimitThreshold.
2204-
2205-
cli_format(QResource = {resource, VHost, queue, QName}) ->
2206-
#{
2207-
<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QResource)),
2208-
<<"name">> => QName,
2209-
<<"virtual_host">> => VHost,
2210-
<<"type">> => <<"quorum">>
2211-
}.

0 commit comments

Comments
 (0)