Skip to content

CLI: new health check that detects QQs without an elected leader #13433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c26edbe
Implement rabbitmq-queues leader_health_check command for quorum queues
Ayanda-D Jun 27, 2024
6cc03b0
Tests for rabbitmq-queues leader_health_check command
Ayanda-D Jun 27, 2024
76d66a1
Ensure calling ParentPID in leader health check execution and
Ayanda-D Jun 28, 2024
857e2a7
Extend core leader health check tests and update badrpc error handlin…
Ayanda-D Jun 28, 2024
6cf9339
Refactor leader_health_check command validators and ignore vhost arg
Ayanda-D Jun 28, 2024
96b8bce
Update leader_health_check_command description and banner
Ayanda-D Jul 3, 2024
239a69b
Improve output formatting for healthy leaders and support
Ayanda-D Jul 4, 2024
48ba3e1
Support global flag to run leader health check for
Ayanda-D Jul 11, 2024
7873737
Return immediately for leader health checks on empty vhosts
Ayanda-D Jul 18, 2024
b7dec89
Rename leader health check timeout refs
Ayanda-D Jul 18, 2024
c7da4d5
Update banner message for global leader health check
Ayanda-D Aug 2, 2024
1736845
QQ leader-health-check: check_process_limit_safety before spawning le…
Ayanda-D Nov 11, 2024
1084179
Log leader health check result in broker logs (if any leaderless queues)
Ayanda-D Jan 10, 2025
68739a6
Ensure check_passed result for leader health internal calls)
Ayanda-D Jan 13, 2025
5f5e992
Extend CLI format output to process check_passed payload
Ayanda-D Jan 23, 2025
ebffd7d
Format leader healthcheck result log and function exports
Ayanda-D Feb 21, 2025
663fc98
Change leader_health_check command scope from queues to diagnostics
Ayanda-D Feb 26, 2025
df82f12
Update (c) line year
Ayanda-D Feb 26, 2025
b2acbae
Rename command to check_for_quorum_queues_without_an_elected_leader
Ayanda-D Feb 26, 2025
7a8e166
Use rabbit_db_queue for qq leader health check lookups
Ayanda-D Feb 26, 2025
9bdb81f
Update tests: quorum_queue_SUITE and rabbit_db_queue_SUITE
Ayanda-D Feb 26, 2025
6158568
Fix typo (cli test module)
Ayanda-D Feb 27, 2025
ea07938
Small refactor - simpler final leader health check result return on f…
Ayanda-D Feb 28, 2025
a45aa81
Clear dialyzer warning & fix type spec
Ayanda-D Mar 3, 2025
bb43c0b
Ignore result without strict match to avoid diayzer warning
Ayanda-D Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@
pattern_match_on_type/1,
pattern_match_on_durable/1,
pattern_match_on_type_and_durable/2,
pattern_match_on_type_and_vhost/2,
reset_decorators/1,
set_immutable/1,
qnode/1,
to_printable/1,
to_printable/2,
macros/0]).

-define(record_version, amqqueue_v2).
Expand Down Expand Up @@ -531,6 +533,12 @@ pattern_match_on_durable(IsDurable) ->
pattern_match_on_type_and_durable(Type, IsDurable) ->
#amqqueue{type = Type, durable = IsDurable, _ = '_'}.

-spec pattern_match_on_type_and_vhost(atom(), binary()) ->
amqqueue_pattern().

pattern_match_on_type_and_vhost(Type, VHost) ->
#amqqueue{type = Type, vhost = VHost, _ = '_'}.

-spec reset_decorators(amqqueue()) -> amqqueue().

reset_decorators(#amqqueue{} = Queue) ->
Expand Down Expand Up @@ -564,6 +572,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name},
<<"virtual_host">> => VHost,
<<"type">> => Type}.

-spec to_printable(rabbit_types:r(queue), atom() | binary()) -> #{binary() => any()}.
to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
_ = rabbit_queue_type:discover(Type),
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
<<"name">> => Name,
<<"virtual_host">> => VHost,
<<"type">> => Type}.

% private

macros() ->
Expand Down
23 changes: 23 additions & 0 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
get_all/0,
get_all/1,
get_all_by_type/1,
get_all_by_type_and_vhost/2,
get_all_by_type_and_node/3,
list/0,
count/0,
Expand Down Expand Up @@ -829,6 +830,28 @@ get_all_by_type(Type) ->
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
}).

%% -------------------------------------------------------------------
%% get_all_by_type_and_vhost().
%% -------------------------------------------------------------------

-spec get_all_by_type_and_vhost(Type, VHost) -> [Queue] when
Type :: atom(),
VHost :: binary(),
Queue :: amqqueue:amqqueue().

%% @doc Gets all queues belonging to the given type and vhost
%%
%% @returns a list of queue records.
%%
%% @private

get_all_by_type_and_vhost(Type, VHost) ->
Pattern = amqqueue:pattern_match_on_type_and_vhost(Type, VHost),
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_all_by_pattern_in_mnesia(Pattern) end,
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
}).

get_all_by_pattern_in_mnesia(Pattern) ->
rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern).

Expand Down
77 changes: 77 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
file_handle_other_reservation/0,
file_handle_release_reservation/0]).

-export([leader_health_check/2,
run_leader_health_check/4]).

-ifdef(TEST).
-export([filter_promotable/2,
ra_machine_config/1]).
Expand Down Expand Up @@ -144,6 +147,8 @@
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).

%%----------- QQ policies ---------------------------------------------------

Expand Down Expand Up @@ -2145,3 +2150,75 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
ok.

leader_health_check(QueueNameOrRegEx, VHost) ->
%% Set a process limit threshold to 20% of ErlangVM process limit, beyond which
%% we cannot spawn any new processes for executing QQ leader health checks.
ProcessLimitThreshold = round(0.2 * erlang:system_info(process_limit)),

leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).

leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
Qs =
case VHost of
across_all_vhosts ->
rabbit_db_queue:get_all_by_type(?MODULE);
VHost when is_binary(VHost) ->
rabbit_db_queue:get_all_by_type_and_vhost(?MODULE, VHost)
end,
check_process_limit_safety(length(Qs), ProcessLimitThreshold),
ParentPID = self(),
HealthCheckRef = make_ref(),
HealthCheckPids =
lists:flatten(
[begin
{resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q),
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
match ->
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end);
_ ->
[]
end
end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]),
Result = wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []),
_ = spawn(fun() -> maybe_log_leader_health_check_result(Result) end),
Result.

run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),

%% Ignoring result here is required to clear a diayzer warning.
_ =
case ra_server_proc:ping(Leader, ?LEADER_HEALTH_CHECK_TIMEOUT) of
{pong,leader} ->
From ! {ok, HealthCheckRef, QResource};
_ ->
From ! {error, HealthCheckRef, QResource}
end,
ok.

wait_for_leader_health_checks(_Ref, 0, UnhealthyAcc) -> UnhealthyAcc;
wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
receive
{ok, Ref, _QResource} ->
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
{error, Ref, QResource} ->
wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc])
after
?GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
UnhealthyAcc
end.

check_process_limit_safety(QCount, ProcessLimitThreshold) ->
case (erlang:system_info(process_count) + QCount) >= ProcessLimitThreshold of
true ->
rabbit_log:warning("Leader health check not permitted, process limit threshold will be exceeded."),
throw({error, leader_health_check_process_limit_exceeded});
false ->
ok
end.

maybe_log_leader_health_check_result([]) -> ok;
maybe_log_leader_health_check_result(Result) ->
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
131 changes: 130 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ all_tests() ->
priority_queue_2_1_ratio,
requeue_multiple_true,
requeue_multiple_false,
subscribe_from_each
subscribe_from_each,
leader_health_check
].

memory_tests() ->
Expand Down Expand Up @@ -4145,6 +4146,129 @@ amqpl_headers(Config) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}).

leader_health_check(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),

%% check empty vhost
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, across_all_vhosts])),

Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),

Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2),
{ok, Ch2} = amqp_connection:open_channel(Conn2),

Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>],
Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>],

%% in vhost1
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs1],

%% in vhost2
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs2],

%% test sucessful health checks in vhost1, vhost2, across_all_vhosts
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost1])) || Q <- Qs1],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost2])) || Q <- Qs2],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, across_all_vhosts])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts])),

%% clear leaderboard
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),

[{_Q1_ClusterName, _Q1Res},
{_Q2_ClusterName, _Q2Res},
{_Q3_ClusterName, _Q3Res},
{_Q4_ClusterName, _Q4Res},
{_Q5_ClusterName, _Q5Res},
{_Q6_ClusterName, _Q6Res}] = QQ_Clusters =
lists:usort(
[begin
{ClusterName, _} = amqqueue:get_pid(Q),
{ClusterName, amqqueue:get_name(Q)}
end
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]),

[Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data =
[begin
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]),
_QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue)
end
|| {Q_ClusterName, Q_Res} <- QQ_Clusters],

%% test failed health checks in vhost1, vhost2, across_all_vhosts
?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.1">>, VHost1])),
?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.2">>, VHost1])),
?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.3">>, VHost1])),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1]))),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1]))),

?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.4">>, VHost2])),
?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.5">>, VHost2])),
?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.6">>, VHost2])),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2]))),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2]))),

?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts]))),
?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts]))),

%% cleanup
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs1],
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs2],

amqp_connection:close(Conn1),
amqp_connection:close(Conn2).


leader_locator_client_local(Config) ->
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = ?config(queue_name, Config),
Expand Down Expand Up @@ -4465,6 +4589,11 @@ declare_passive(Ch, Q, Args) ->
auto_delete = false,
passive = true,
arguments = Args}).

set_up_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).

assert_queue_type(Server, Q, Expected) ->
assert_queue_type(Server, <<"/">>, Q, Expected).

Expand Down
25 changes: 25 additions & 0 deletions deps/rabbit/test/rabbit_db_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ all_tests() ->
get_all,
get_all_by_vhost,
get_all_by_type,
get_all_by_type_and_vhost,
get_all_by_type_and_node,
list,
count,
Expand Down Expand Up @@ -198,6 +199,30 @@ get_all_by_type1(_Config) ->
?assertEqual([Q4], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)),
passed.

get_all_by_type_and_vhost(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_vhost1, [Config]).

get_all_by_type_and_vhost1(_Config) ->
VHost1 = <<"carrots">>,
VHost2 = <<"cabage">>,
QName = rabbit_misc:r(VHost1, queue, <<"test-queue">>),
QName2 = rabbit_misc:r(VHost2, queue, <<"test-queue2">>),
QName3 = rabbit_misc:r(VHost2, queue, <<"test-queue3">>),
QName4 = rabbit_misc:r(VHost1, queue, <<"test-queue4">>),
Q = new_queue(QName, rabbit_classic_queue),
Q2 = new_queue(QName2, rabbit_quorum_queue),
Q3 = new_queue(QName3, rabbit_quorum_queue),
Q4 = new_queue(QName4, rabbit_stream_queue),
Quorum = lists:sort([Q2, Q3]),
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
set_list([Q, Q2, Q3, Q4]),
?assertEqual([Q], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
?assertEqual(Quorum, lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
?assertEqual([Q4], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
passed.

get_all_by_type_and_node(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_node1, [Config]).

Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ defmodule RabbitMQ.CLI.Core.Output do
:ok
end

def format_output({:ok, :check_passed, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end

def format_output({:ok, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end
Expand Down
Loading
Loading