Skip to content

Commit 4f076e6

Browse files
Merge pull request #13488 from rabbitmq/mergify/bp/v4.1.x/pr-13487
By @Ayanda-D: new CLI health check that detects QQs without an elected reachable leader #13433 (backport #13487)
2 parents 1e34e26 + e1d7481 commit 4f076e6

File tree

8 files changed

+433
-1
lines changed

8 files changed

+433
-1
lines changed

deps/rabbit/src/amqqueue.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,12 @@
6666
pattern_match_on_type/1,
6767
pattern_match_on_durable/1,
6868
pattern_match_on_type_and_durable/2,
69+
pattern_match_on_type_and_vhost/2,
6970
reset_decorators/1,
7071
set_immutable/1,
7172
qnode/1,
7273
to_printable/1,
74+
to_printable/2,
7375
macros/0]).
7476

7577
-define(record_version, amqqueue_v2).
@@ -531,6 +533,12 @@ pattern_match_on_durable(IsDurable) ->
531533
pattern_match_on_type_and_durable(Type, IsDurable) ->
532534
#amqqueue{type = Type, durable = IsDurable, _ = '_'}.
533535

536+
-spec pattern_match_on_type_and_vhost(atom(), binary()) ->
537+
amqqueue_pattern().
538+
539+
pattern_match_on_type_and_vhost(Type, VHost) ->
540+
#amqqueue{type = Type, vhost = VHost, _ = '_'}.
541+
534542
-spec reset_decorators(amqqueue()) -> amqqueue().
535543

536544
reset_decorators(#amqqueue{} = Queue) ->
@@ -564,6 +572,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name},
564572
<<"virtual_host">> => VHost,
565573
<<"type">> => Type}.
566574

575+
-spec to_printable(rabbit_types:r(queue), atom() | binary()) -> #{binary() => any()}.
576+
to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
577+
_ = rabbit_queue_type:discover(Type),
578+
#{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)),
579+
<<"name">> => Name,
580+
<<"virtual_host">> => VHost,
581+
<<"type">> => Type}.
582+
567583
% private
568584

569585
macros() ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
get_all/0,
2222
get_all/1,
2323
get_all_by_type/1,
24+
get_all_by_type_and_vhost/2,
2425
get_all_by_type_and_node/3,
2526
list/0,
2627
count/0,
@@ -829,6 +830,28 @@ get_all_by_type(Type) ->
829830
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
830831
}).
831832

833+
%% -------------------------------------------------------------------
834+
%% get_all_by_type_and_vhost().
835+
%% -------------------------------------------------------------------
836+
837+
-spec get_all_by_type_and_vhost(Type, VHost) -> [Queue] when
838+
Type :: atom(),
839+
VHost :: binary(),
840+
Queue :: amqqueue:amqqueue().
841+
842+
%% @doc Gets all queues belonging to the given type and vhost
843+
%%
844+
%% @returns a list of queue records.
845+
%%
846+
%% @private
847+
848+
get_all_by_type_and_vhost(Type, VHost) ->
849+
Pattern = amqqueue:pattern_match_on_type_and_vhost(Type, VHost),
850+
rabbit_khepri:handle_fallback(
851+
#{mnesia => fun() -> get_all_by_pattern_in_mnesia(Pattern) end,
852+
khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end
853+
}).
854+
832855
get_all_by_pattern_in_mnesia(Pattern) ->
833856
rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern).
834857

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282
file_handle_other_reservation/0,
8383
file_handle_release_reservation/0]).
8484

85+
-export([leader_health_check/2,
86+
run_leader_health_check/4]).
87+
8588
-ifdef(TEST).
8689
-export([filter_promotable/2,
8790
ra_machine_config/1]).
@@ -144,6 +147,8 @@
144147
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
145148
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
146149
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
150+
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
151+
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
147152

148153
%%----------- QQ policies ---------------------------------------------------
149154

@@ -2145,3 +2150,75 @@ file_handle_other_reservation() ->
21452150
file_handle_release_reservation() ->
21462151
ok.
21472152

2153+
leader_health_check(QueueNameOrRegEx, VHost) ->
2154+
%% Set a process limit threshold to 20% of ErlangVM process limit, beyond which
2155+
%% we cannot spawn any new processes for executing QQ leader health checks.
2156+
ProcessLimitThreshold = round(0.2 * erlang:system_info(process_limit)),
2157+
2158+
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).
2159+
2160+
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
2161+
Qs =
2162+
case VHost of
2163+
across_all_vhosts ->
2164+
rabbit_db_queue:get_all_by_type(?MODULE);
2165+
VHost when is_binary(VHost) ->
2166+
rabbit_db_queue:get_all_by_type_and_vhost(?MODULE, VHost)
2167+
end,
2168+
check_process_limit_safety(length(Qs), ProcessLimitThreshold),
2169+
ParentPID = self(),
2170+
HealthCheckRef = make_ref(),
2171+
HealthCheckPids =
2172+
lists:flatten(
2173+
[begin
2174+
{resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q),
2175+
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
2176+
match ->
2177+
{ClusterName, _} = rabbit_amqqueue:pid_of(Q),
2178+
_Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end);
2179+
_ ->
2180+
[]
2181+
end
2182+
end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]),
2183+
Result = wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []),
2184+
_ = spawn(fun() -> maybe_log_leader_health_check_result(Result) end),
2185+
Result.
2186+
2187+
run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->
2188+
Leader = ra_leaderboard:lookup_leader(ClusterName),
2189+
2190+
%% Ignoring result here is required to clear a diayzer warning.
2191+
_ =
2192+
case ra_server_proc:ping(Leader, ?LEADER_HEALTH_CHECK_TIMEOUT) of
2193+
{pong,leader} ->
2194+
From ! {ok, HealthCheckRef, QResource};
2195+
_ ->
2196+
From ! {error, HealthCheckRef, QResource}
2197+
end,
2198+
ok.
2199+
2200+
wait_for_leader_health_checks(_Ref, 0, UnhealthyAcc) -> UnhealthyAcc;
2201+
wait_for_leader_health_checks(Ref, N, UnhealthyAcc) ->
2202+
receive
2203+
{ok, Ref, _QResource} ->
2204+
wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc);
2205+
{error, Ref, QResource} ->
2206+
wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc])
2207+
after
2208+
?GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT ->
2209+
UnhealthyAcc
2210+
end.
2211+
2212+
check_process_limit_safety(QCount, ProcessLimitThreshold) ->
2213+
case (erlang:system_info(process_count) + QCount) >= ProcessLimitThreshold of
2214+
true ->
2215+
rabbit_log:warning("Leader health check not permitted, process limit threshold will be exceeded."),
2216+
throw({error, leader_health_check_process_limit_exceeded});
2217+
false ->
2218+
ok
2219+
end.
2220+
2221+
maybe_log_leader_health_check_result([]) -> ok;
2222+
maybe_log_leader_health_check_result(Result) ->
2223+
Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result),
2224+
rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ all_tests() ->
192192
priority_queue_2_1_ratio,
193193
requeue_multiple_true,
194194
requeue_multiple_false,
195-
subscribe_from_each
195+
subscribe_from_each,
196+
leader_health_check
196197
].
197198

198199
memory_tests() ->
@@ -4145,6 +4146,129 @@ amqpl_headers(Config) ->
41454146
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
41464147
multiple = true}).
41474148

4149+
leader_health_check(Config) ->
4150+
VHost1 = <<"vhost1">>,
4151+
VHost2 = <<"vhost2">>,
4152+
4153+
set_up_vhost(Config, VHost1),
4154+
set_up_vhost(Config, VHost2),
4155+
4156+
%% check empty vhost
4157+
?assertEqual([],
4158+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4159+
[<<".*">>, VHost1])),
4160+
?assertEqual([],
4161+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4162+
[<<".*">>, across_all_vhosts])),
4163+
4164+
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
4165+
{ok, Ch1} = amqp_connection:open_channel(Conn1),
4166+
4167+
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2),
4168+
{ok, Ch2} = amqp_connection:open_channel(Conn2),
4169+
4170+
Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>],
4171+
Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>],
4172+
4173+
%% in vhost1
4174+
[?assertEqual({'queue.declare_ok', Q, 0, 0},
4175+
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4176+
|| Q <- Qs1],
4177+
4178+
%% in vhost2
4179+
[?assertEqual({'queue.declare_ok', Q, 0, 0},
4180+
declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4181+
|| Q <- Qs2],
4182+
4183+
%% test sucessful health checks in vhost1, vhost2, across_all_vhosts
4184+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4185+
[<<".*">>, VHost1])),
4186+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4187+
[<<"Q.*">>, VHost1])),
4188+
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4189+
[Q, VHost1])) || Q <- Qs1],
4190+
4191+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4192+
[<<".*">>, VHost2])),
4193+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4194+
[<<"Q.*">>, VHost2])),
4195+
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4196+
[Q, VHost2])) || Q <- Qs2],
4197+
4198+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4199+
[<<".*">>, across_all_vhosts])),
4200+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4201+
[<<"Q.*">>, across_all_vhosts])),
4202+
4203+
%% clear leaderboard
4204+
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4205+
4206+
[{_Q1_ClusterName, _Q1Res},
4207+
{_Q2_ClusterName, _Q2Res},
4208+
{_Q3_ClusterName, _Q3Res},
4209+
{_Q4_ClusterName, _Q4Res},
4210+
{_Q5_ClusterName, _Q5Res},
4211+
{_Q6_ClusterName, _Q6Res}] = QQ_Clusters =
4212+
lists:usort(
4213+
[begin
4214+
{ClusterName, _} = amqqueue:get_pid(Q),
4215+
{ClusterName, amqqueue:get_name(Q)}
4216+
end
4217+
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]),
4218+
4219+
[Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data =
4220+
[begin
4221+
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]),
4222+
_QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue)
4223+
end
4224+
|| {Q_ClusterName, Q_Res} <- QQ_Clusters],
4225+
4226+
%% test failed health checks in vhost1, vhost2, across_all_vhosts
4227+
?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4228+
[<<"Q.1">>, VHost1])),
4229+
?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4230+
[<<"Q.2">>, VHost1])),
4231+
?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4232+
[<<"Q.3">>, VHost1])),
4233+
?assertEqual([Q1Data, Q2Data, Q3Data],
4234+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4235+
[<<".*">>, VHost1]))),
4236+
?assertEqual([Q1Data, Q2Data, Q3Data],
4237+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4238+
[<<"Q.*">>, VHost1]))),
4239+
4240+
?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4241+
[<<"Q.4">>, VHost2])),
4242+
?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4243+
[<<"Q.5">>, VHost2])),
4244+
?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4245+
[<<"Q.6">>, VHost2])),
4246+
?assertEqual([Q4Data, Q5Data, Q6Data],
4247+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4248+
[<<".*">>, VHost2]))),
4249+
?assertEqual([Q4Data, Q5Data, Q6Data],
4250+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4251+
[<<"Q.*">>, VHost2]))),
4252+
4253+
?assertEqual(QQ_Data,
4254+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4255+
[<<"Q.*">>, across_all_vhosts]))),
4256+
?assertEqual(QQ_Data,
4257+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4258+
[<<"Q.*">>, across_all_vhosts]))),
4259+
4260+
%% cleanup
4261+
[?assertMatch(#'queue.delete_ok'{},
4262+
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
4263+
|| Q <- Qs1],
4264+
[?assertMatch(#'queue.delete_ok'{},
4265+
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
4266+
|| Q <- Qs2],
4267+
4268+
amqp_connection:close(Conn1),
4269+
amqp_connection:close(Conn2).
4270+
4271+
41484272
leader_locator_client_local(Config) ->
41494273
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
41504274
Q = ?config(queue_name, Config),
@@ -4465,6 +4589,11 @@ declare_passive(Ch, Q, Args) ->
44654589
auto_delete = false,
44664590
passive = true,
44674591
arguments = Args}).
4592+
4593+
set_up_vhost(Config, VHost) ->
4594+
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
4595+
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).
4596+
44684597
assert_queue_type(Server, Q, Expected) ->
44694598
assert_queue_type(Server, <<"/">>, Q, Expected).
44704599

deps/rabbit/test/rabbit_db_queue_SUITE.erl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ all_tests() ->
3535
get_all,
3636
get_all_by_vhost,
3737
get_all_by_type,
38+
get_all_by_type_and_vhost,
3839
get_all_by_type_and_node,
3940
list,
4041
count,
@@ -198,6 +199,30 @@ get_all_by_type1(_Config) ->
198199
?assertEqual([Q4], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)),
199200
passed.
200201

202+
get_all_by_type_and_vhost(Config) ->
203+
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_vhost1, [Config]).
204+
205+
get_all_by_type_and_vhost1(_Config) ->
206+
VHost1 = <<"carrots">>,
207+
VHost2 = <<"cabage">>,
208+
QName = rabbit_misc:r(VHost1, queue, <<"test-queue">>),
209+
QName2 = rabbit_misc:r(VHost2, queue, <<"test-queue2">>),
210+
QName3 = rabbit_misc:r(VHost2, queue, <<"test-queue3">>),
211+
QName4 = rabbit_misc:r(VHost1, queue, <<"test-queue4">>),
212+
Q = new_queue(QName, rabbit_classic_queue),
213+
Q2 = new_queue(QName2, rabbit_quorum_queue),
214+
Q3 = new_queue(QName3, rabbit_quorum_queue),
215+
Q4 = new_queue(QName4, rabbit_stream_queue),
216+
Quorum = lists:sort([Q2, Q3]),
217+
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
218+
?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
219+
?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
220+
set_list([Q, Q2, Q3, Q4]),
221+
?assertEqual([Q], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)),
222+
?assertEqual(Quorum, lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))),
223+
?assertEqual([Q4], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)),
224+
passed.
225+
201226
get_all_by_type_and_node(Config) ->
202227
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_node1, [Config]).
203228

deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ defmodule RabbitMQ.CLI.Core.Output do
1818
:ok
1919
end
2020

21+
def format_output({:ok, :check_passed, output}, formatter, options) do
22+
{:ok, formatter.format_output(output, options)}
23+
end
24+
2125
def format_output({:ok, output}, formatter, options) do
2226
{:ok, formatter.format_output(output, options)}
2327
end

0 commit comments

Comments
 (0)