Skip to content

By @Ayanda-D: new CLI health check that detects QQs without an elected reachable leader #13433 (backport #13487) (backport #13488) #13489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@
pattern_match_on_type/1,
pattern_match_on_durable/1,
pattern_match_on_type_and_durable/2,
pattern_match_on_type_and_vhost/2,
reset_decorators/1,
set_immutable/1,
qnode/1,
to_printable/1,
to_printable/2,
macros/0]).

-define(record_version, amqqueue_v2).
Expand Down Expand Up @@ -531,6 +533,12 @@ pattern_match_on_durable(IsDurable) ->
pattern_match_on_type_and_durable(Type, IsDurable) ->
#amqqueue{type = Type, durable = IsDurable, _ = '_'}.

-spec pattern_match_on_type_and_vhost(atom(), binary()) ->
amqqueue_pattern().

pattern_match_on_type_and_vhost(Type, VHost) ->
#amqqueue{type = Type, vhost = VHost, _ = '_'}.

-spec reset_decorators(amqqueue()) -> amqqueue().

reset_decorators(#amqqueue{} = Queue) ->
Expand Down Expand Up @@ -564,6 +572,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name},
<<"virtual_host">> => VHost,
<<"type">> => Type}.

-spec to_printable(rabbit_types:r(queue), atom() | binary()) -> #{binary() => any()}.
to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
_ = rabbit_queue_type:discover(Type),
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
<<"name">> => Name,
<<"virtual_host">> => VHost,
<<"type">> => Type}.

% private

macros() ->
Expand Down
23 changes: 23 additions & 0 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
get_all/0,
get_all/1,
get_all_by_type/1,
get_all_by_type_and_vhost/2,
get_all_by_type_and_node/3,
list/0,
count/0,
Expand Down Expand Up @@ -829,6 +830,28 @@ get_all_by_type(Type) ->
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
}).

%% -------------------------------------------------------------------
%% get_all_by_type_and_vhost().
%% -------------------------------------------------------------------

-spec get_all_by_type_and_vhost(Type, VHost) -> [Queue] when
Type :: atom(),
VHost :: binary(),
Queue :: amqqueue:amqqueue().

%% @doc Gets all queues belonging to the given type and vhost
%%
%% @returns a list of queue records.
%%
%% @private

get_all_by_type_and_vhost(Type, VHost) ->
Pattern = amqqueue:pattern_match_on_type_and_vhost(Type, VHost),
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_all_by_pattern_in_mnesia(Pattern) end,
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
}).

get_all_by_pattern_in_mnesia(Pattern) ->
rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern).

Expand Down
77 changes: 77 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
file_handle_other_reservation/0,
file_handle_release_reservation/0]).

-export([leader_health_check/2,
run_leader_health_check/4]).

-ifdef(TEST).
-export([filter_promotable/2,
ra_machine_config/1]).
Expand Down Expand Up @@ -144,6 +147,8 @@
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).

%%----------- QQ policies ---------------------------------------------------

Expand Down Expand Up @@ -2120,3 +2125,75 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
ok.

leader_health_check(QueueNameOrRegEx, VHost) ->
%% Set a process limit threshold to 20% of ErlangVM process limit, beyond which
%% we cannot spawn any new processes for executing QQ leader health checks.
ProcessLimitThreshold = round(0.2 * erlang:system_info(process_limit)),

leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).

leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
Qs =
case VHost of
across_all_vhosts ->
rabbit_db_queue:get_all_by_type(?MODULE);
VHost when is_binary(VHost) ->
rabbit_db_queue:get_all_by_type_and_vhost(?MODULE, VHost)
end,
check_process_limit_safety(length(Qs), ProcessLimitThreshold),
ParentPID = self(),
HealthCheckRef = make_ref(),
HealthCheckPids =
lists:flatten(
[begin
{resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q),
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
match ->
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end);
_ ->
[]
end
end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]),
Result = wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []),
_ = spawn(fun() -> maybe_log_leader_health_check_result(Result) end),
Result.

run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),

%% Ignoring result here is required to clear a diayzer warning.
_ =
case ra_server_proc:ping(Leader, ?LEADER_HEALTH_CHECK_TIMEOUT) of
{pong,leader} ->
From ! {ok, HealthCheckRef, QResource};
_ ->
From ! {error, HealthCheckRef, QResource}
end,
ok.

wait_for_leader_health_checks(_Ref, 0, UnhealthyAcc) -> UnhealthyAcc;
wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
receive
{ok, Ref, _QResource} ->
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
{error, Ref, QResource} ->
wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc])
after
?GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
UnhealthyAcc
end.

check_process_limit_safety(QCount, ProcessLimitThreshold) ->
case (erlang:system_info(process_count) + QCount) >= ProcessLimitThreshold of
true ->
rabbit_log:warning("Leader health check not permitted, process limit threshold will be exceeded."),
throw({error, leader_health_check_process_limit_exceeded});
false ->
ok
end.

maybe_log_leader_health_check_result([]) -> ok;
maybe_log_leader_health_check_result(Result) ->
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).
131 changes: 130 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ all_tests() ->
priority_queue_fifo,
priority_queue_2_1_ratio,
requeue_multiple_true,
requeue_multiple_false
requeue_multiple_false,
leader_health_check
].

memory_tests() ->
Expand Down Expand Up @@ -4106,6 +4107,129 @@ amqpl_headers(Config) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}).

leader_health_check(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),

%% check empty vhost
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, across_all_vhosts])),

Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),

Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2),
{ok, Ch2} = amqp_connection:open_channel(Conn2),

Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>],
Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>],

%% in vhost1
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs1],

%% in vhost2
[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- Qs2],

%% test sucessful health checks in vhost1, vhost2, across_all_vhosts
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost1])) || Q <- Qs1],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2])),
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[Q, VHost2])) || Q <- Qs2],

?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, across_all_vhosts])),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts])),

%% clear leaderboard
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),

[{_Q1_ClusterName, _Q1Res},
{_Q2_ClusterName, _Q2Res},
{_Q3_ClusterName, _Q3Res},
{_Q4_ClusterName, _Q4Res},
{_Q5_ClusterName, _Q5Res},
{_Q6_ClusterName, _Q6Res}] = QQ_Clusters =
lists:usort(
[begin
{ClusterName, _} = amqqueue:get_pid(Q),
{ClusterName, amqqueue:get_name(Q)}
end
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]),

[Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data =
[begin
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]),
_QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue)
end
|| {Q_ClusterName, Q_Res} <- QQ_Clusters],

%% test failed health checks in vhost1, vhost2, across_all_vhosts
?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.1">>, VHost1])),
?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.2">>, VHost1])),
?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.3">>, VHost1])),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost1]))),
?assertEqual([Q1Data, Q2Data, Q3Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost1]))),

?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.4">>, VHost2])),
?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.5">>, VHost2])),
?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.6">>, VHost2])),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<".*">>, VHost2]))),
?assertEqual([Q4Data, Q5Data, Q6Data],
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, VHost2]))),

?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts]))),
?assertEqual(QQ_Data,
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
[<<"Q.*">>, across_all_vhosts]))),

%% cleanup
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs1],
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
|| Q <- Qs2],

amqp_connection:close(Conn1),
amqp_connection:close(Conn2).


leader_locator_client_local(Config) ->
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = ?config(queue_name, Config),
Expand Down Expand Up @@ -4426,6 +4550,11 @@ declare_passive(Ch, Q, Args) ->
auto_delete = false,
passive = true,
arguments = Args}).

set_up_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).

assert_queue_type(Server, Q, Expected) ->
assert_queue_type(Server, <<"/">>, Q, Expected).

Expand Down
25 changes: 25 additions & 0 deletions deps/rabbit/test/rabbit_db_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ all_tests() ->
get_all,
get_all_by_vhost,
get_all_by_type,
get_all_by_type_and_vhost,
get_all_by_type_and_node,
list,
count,
Expand Down Expand Up @@ -198,6 +199,30 @@ get_all_by_type1(_Config) ->
?assertEqual([Q4], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)),
passed.

get_all_by_type_and_vhost(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_vhost1, [Config]).

get_all_by_type_and_vhost1(_Config) ->
VHost1 = <<"carrots">>,
VHost2 = <<"cabage">>,
QName = rabbit_misc:r(VHost1, queue, <<"test-queue">>),
QName2 = rabbit_misc:r(VHost2, queue, <<"test-queue2">>),
QName3 = rabbit_misc:r(VHost2, queue, <<"test-queue3">>),
QName4 = rabbit_misc:r(VHost1, queue, <<"test-queue4">>),
Q = new_queue(QName, rabbit_classic_queue),
Q2 = new_queue(QName2, rabbit_quorum_queue),
Q3 = new_queue(QName3, rabbit_quorum_queue),
Q4 = new_queue(QName4, rabbit_stream_queue),
Quorum = lists:sort([Q2, Q3]),
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
set_list([Q, Q2, Q3, Q4]),
?assertEqual([Q], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
?assertEqual(Quorum, lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
?assertEqual([Q4], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
passed.

get_all_by_type_and_node(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_node1, [Config]).

Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ defmodule RabbitMQ.CLI.Core.Output do
:ok
end

def format_output({:ok, :check_passed, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end

def format_output({:ok, output}, formatter, options) do
{:ok, formatter.format_output(output, options)}
end
Expand Down
Loading
Loading