Skip to content

Commit c26edbe

Browse files
committed
Implement rabbitmq-queues leader_health_check command for quorum queues
1 parent 9dd6fa7 commit c26edbe

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@
8282
file_handle_other_reservation/0,
8383
file_handle_release_reservation/0]).
8484

85+
-export([leader_health_check/2, run_leader_health_check/4]).
86+
8587
-ifdef(TEST).
8688
-export([filter_promotable/2,
8789
ra_machine_config/1]).
@@ -144,6 +146,8 @@
144146
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
145147
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
146148
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
149+
-define(QQ_LEADER_HEALTH_CHECK_TIMEOUT, 1_000).
150+
-define(QQ_GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
147151

148152
%%----------- QQ policies ---------------------------------------------------
149153

@@ -2145,3 +2149,63 @@ file_handle_other_reservation() ->
21452149
file_handle_release_reservation() ->
21462150
ok.
21472151

2152+
leader_health_check(QueueNameOrRegEx, VHost) ->
2153+
%% Set a process limit threshold to 40% of ErlangVM process limit, beyond which
2154+
%% we cannot spawn any new processes for executing QQ leader health checks.
2155+
ProcessLimitThreshold = round(0.4 * erlang:system_info(process_limit)),
2156+
2157+
HealthCheckRef = make_ref(),
2158+
HealthCheckPids =
2159+
lists:flatten(
2160+
[begin
2161+
{resource, VHost, queue, QueueName} = QResource = amqqueue:get_name(Q),
2162+
case check_process_limit_safety(ProcessLimitThreshold) of
2163+
true ->
2164+
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
2165+
match ->
2166+
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
2167+
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, self()) end);
2168+
_ ->
2169+
[]
2170+
end;
2171+
false ->
2172+
[]
2173+
end
2174+
end || Q <- rabbit_amqqueue:list(VHost), amqqueue:get_type(Q) == ?MODULE]),
2175+
wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []).
2176+
2177+
run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->
2178+
Leader = ra_leaderboard:lookup_leader(ClusterName),
2179+
case ra_server_proc:ping(Leader, ?QQ_LEADER_HEALTH_CHECK_TIMEOUT) of
2180+
{pong,leader} ->
2181+
From ! {ok, HealthCheckRef, QResource};
2182+
_ ->
2183+
From ! {error, HealthCheckRef, QResource}
2184+
end,
2185+
ok.
2186+
2187+
wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
2188+
receive
2189+
{ok, Ref, _QResource} when N == 1 ->
2190+
UnhealthyAcc;
2191+
{error, Ref, QResource} when N == 1 ->
2192+
[cli_format(QResource) | UnhealthyAcc];
2193+
{ok, Ref, _QResource} ->
2194+
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
2195+
{error, Ref, QResource} ->
2196+
wait_for_leader_health_checks(Ref, N - 1, [cli_format(QResource) | UnhealthyAcc])
2197+
after
2198+
?QQ_GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
2199+
UnhealthyAcc
2200+
end.
2201+
2202+
check_process_limit_safety(ProcessLimitThreshold) ->
2203+
erlang:system_info(process_count) < ProcessLimitThreshold.
2204+
2205+
cli_format(QResource = {resource, VHost, queue, QName}) ->
2206+
#{
2207+
<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QResource)),
2208+
<<"name">> => QName,
2209+
<<"virtual_host">> => VHost,
2210+
<<"type">> => <<"quorum">>
2211+
}.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2024 VMware, Inc. or its affiliates. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Queues.Commands.LeaderHealthCheckCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0]
13+
14+
def scopes(), do: [:queues]
15+
16+
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
17+
18+
use RabbitMQ.CLI.Core.MergesDefaultVirtualHost
19+
20+
def validate(args, _) when length(args) < 1 do
21+
{:validation_failure, :not_enough_args}
22+
end
23+
24+
def validate(args, _) when length(args) > 1 do
25+
{:validation_failure, :too_many_args}
26+
end
27+
28+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
29+
30+
def run([pattern] = _args, %{node: node_name, vhost: vhost}) do
31+
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :leader_health_check, [pattern, vhost]) do
32+
[] ->
33+
:ok
34+
35+
unhealthy_queues ->
36+
{:error, unhealthy_queues}
37+
end
38+
end
39+
40+
def output(:ok, %{formatter: "json"}) do
41+
{:error, :check_passed}
42+
end
43+
44+
def output({:error, unhealthy_queues}, %{vhost: vhost, formatter: "json"}) when is_list(unhealthy_queues) do
45+
lines = queue_lines(unhealthy_queues)
46+
47+
{:error, :check_failed, Enum.join(lines, line_separator())}
48+
end
49+
50+
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
51+
52+
def usage() do
53+
"leader_health_check [--vhost <vhost>] <pattern>"
54+
end
55+
56+
def usage_additional do
57+
[
58+
["<pattern>", "regular expression pattern used to match quorum queues"]
59+
]
60+
end
61+
62+
def help_section(), do: :observability_and_health_checks
63+
64+
def usage_doc_guides() do
65+
[
66+
DocGuide.quorum_queues()
67+
]
68+
end
69+
70+
def description(), do: "Checks availability and health of quorum queue leader"
71+
72+
def banner([name], %{vhost: vhost}),
73+
do: "Checking availability and health status of queue(s) matching #{name} in vhost #{vhost} ..."
74+
75+
def queue_lines(qs) do
76+
for q <- qs, do: "Leader for #{q["readable_name"]} is unhealthy and unavailable"
77+
end
78+
end

0 commit comments

Comments
 (0)