Skip to content

Commit 48ba3e1

Browse files
committed
Support global flag to run leader health check for
all queues in all vhosts on local node
1 parent 239a69b commit 48ba3e1

File tree

4 files changed

+121
-31
lines changed

4 files changed

+121
-31
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,12 +2154,22 @@ leader_health_check(QueueNameOrRegEx, VHost) ->
21542154
%% we cannot spawn any new processes for executing QQ leader health checks.
21552155
ProcessLimitThreshold = round(0.4 * erlang:system_info(process_limit)),
21562156

2157+
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold).
2158+
2159+
leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) ->
2160+
Qs =
2161+
case VHost of
2162+
global ->
2163+
rabbit_amqqueue:list();
2164+
VHost when is_binary(VHost) ->
2165+
rabbit_amqqueue:list(VHost)
2166+
end,
21572167
ParentPID = self(),
21582168
HealthCheckRef = make_ref(),
21592169
HealthCheckPids =
21602170
lists:flatten(
21612171
[begin
2162-
{resource, VHost, queue, QueueName} = QResource = amqqueue:get_name(Q),
2172+
{resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q),
21632173
case check_process_limit_safety(ProcessLimitThreshold) of
21642174
true ->
21652175
case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of
@@ -2173,7 +2183,7 @@ leader_health_check(QueueNameOrRegEx, VHost) ->
21732183
rabbit_log:warning("Leader health check failed from exceeded process limit threshold"),
21742184
throw({error, leader_health_check_process_limit_exceeded})
21752185
end
2176-
end || Q <- rabbit_amqqueue:list(VHost), amqqueue:get_type(Q) == ?MODULE]),
2186+
end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]),
21772187
wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []).
21782188

21792189
run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 94 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4147,52 +4147,119 @@ amqpl_headers(Config) ->
41474147
multiple = true}).
41484148

41494149
leader_health_check(Config) ->
4150-
[Server | _] = _Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4151-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
4150+
VHost1 = <<"vhost1">>,
4151+
VHost2 = <<"vhost2">>,
4152+
4153+
set_up_vhost(Config, VHost1),
4154+
set_up_vhost(Config, VHost2),
4155+
4156+
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
4157+
{ok, Ch1} = amqp_connection:open_channel(Conn1),
4158+
4159+
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2),
4160+
{ok, Ch2} = amqp_connection:open_channel(Conn2),
4161+
4162+
Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>],
4163+
Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>],
4164+
4165+
%% in vhost1
41524166
[?assertEqual({'queue.declare_ok', Q, 0, 0},
4153-
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4154-
|| Q <- [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>]],
4167+
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4168+
|| Q <- Qs1],
4169+
4170+
%% in vhost2
4171+
[?assertEqual({'queue.declare_ok', Q, 0, 0},
4172+
declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4173+
|| Q <- Qs2],
4174+
4175+
%% test sucessful health checks in vhost1, vhost2, global
41554176
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4156-
[<<".*">>, <<"/">>])),
4177+
[<<".*">>, VHost1])),
41574178
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4158-
[<<"Q.*">>, <<"/">>])),
4179+
[<<"Q.*">>, VHost1])),
4180+
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4181+
[Q, VHost1])) || Q <- Qs1],
4182+
41594183
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4160-
[<<"Q.1">>, <<"/">>])),
4184+
[<<".*">>, VHost2])),
41614185
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4162-
[<<"Q.2">>, <<"/">>])),
4186+
[<<"Q.*">>, VHost2])),
4187+
[?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4188+
[Q, VHost2])) || Q <- Qs2],
4189+
4190+
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4191+
[<<".*">>, global])),
41634192
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4164-
[<<"Q.3">>, <<"/">>])),
4193+
[<<"Q.*">>, global])),
41654194

4195+
%% clear leaderboard
41664196
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
41674197

4168-
[{Q1_ClusterName, Q1Res}, {Q2_ClusterName, Q2Res}, {Q3_ClusterName, Q3Res}] =
4198+
[{_Q1_ClusterName, _Q1Res},
4199+
{_Q2_ClusterName, _Q2Res},
4200+
{_Q3_ClusterName, _Q3Res},
4201+
{_Q4_ClusterName, _Q4Res},
4202+
{_Q5_ClusterName, _Q5Res},
4203+
{_Q6_ClusterName, _Q6Res}] = QQ_Clusters =
41694204
lists:usort(
41704205
[begin
41714206
{ClusterName, _} = amqqueue:get_pid(Q),
41724207
{ClusterName, amqqueue:get_name(Q)}
41734208
end
41744209
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]),
41754210

4176-
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q1_ClusterName]),
4177-
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q2_ClusterName]),
4178-
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q3_ClusterName]),
4179-
4180-
Q1Data = amqqueue:to_printable(Q1Res, rabbit_quorum_queue),
4181-
Q2Data = amqqueue:to_printable(Q2Res, rabbit_quorum_queue),
4182-
Q3Data = amqqueue:to_printable(Q3Res, rabbit_quorum_queue),
4211+
[Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data =
4212+
[begin
4213+
rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]),
4214+
_QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue)
4215+
end
4216+
|| {Q_ClusterName, Q_Res} <- QQ_Clusters],
41834217

4218+
%% test failed health checks in vhost1, vhost2, global
41844219
?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4185-
[<<"Q.1">>, <<"/">>])),
4220+
[<<"Q.1">>, VHost1])),
41864221
?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4187-
[<<"Q.2">>, <<"/">>])),
4222+
[<<"Q.2">>, VHost1])),
41884223
?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4189-
[<<"Q.3">>, <<"/">>])),
4224+
[<<"Q.3">>, VHost1])),
41904225
?assertEqual([Q1Data, Q2Data, Q3Data],
41914226
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4192-
[<<".*">>, <<"/">>]))),
4227+
[<<".*">>, VHost1]))),
41934228
?assertEqual([Q1Data, Q2Data, Q3Data],
41944229
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4195-
[<<"Q.*">>, <<"/">>]))).
4230+
[<<"Q.*">>, VHost1]))),
4231+
4232+
?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4233+
[<<"Q.4">>, VHost2])),
4234+
?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4235+
[<<"Q.5">>, VHost2])),
4236+
?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4237+
[<<"Q.6">>, VHost2])),
4238+
?assertEqual([Q4Data, Q5Data, Q6Data],
4239+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4240+
[<<".*">>, VHost2]))),
4241+
?assertEqual([Q4Data, Q5Data, Q6Data],
4242+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4243+
[<<"Q.*">>, VHost2]))),
4244+
4245+
?assertEqual(QQ_Data,
4246+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4247+
[<<"Q.*">>, global]))),
4248+
?assertEqual(QQ_Data,
4249+
lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check,
4250+
[<<"Q.*">>, global]))),
4251+
4252+
%% cleanup
4253+
[?assertMatch(#'queue.delete_ok'{},
4254+
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
4255+
|| Q <- Qs1],
4256+
[?assertMatch(#'queue.delete_ok'{},
4257+
amqp_channel:call(Ch1, #'queue.delete'{queue = Q}))
4258+
|| Q <- Qs2],
4259+
4260+
amqp_connection:close(Conn1),
4261+
amqp_connection:close(Conn2).
4262+
41964263

41974264
leader_locator_client_local(Config) ->
41984265
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -4514,6 +4581,11 @@ declare_passive(Ch, Q, Args) ->
45144581
auto_delete = false,
45154582
passive = true,
45164583
arguments = Args}).
4584+
4585+
set_up_vhost(Config, VHost) ->
4586+
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
4587+
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).
4588+
45174589
assert_queue_type(Server, Q, Expected) ->
45184590
assert_queue_type(Server, <<"/">>, Q, Expected).
45194591

deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/leader_health_check_command.ex

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,27 @@ defmodule RabbitMQ.CLI.Queues.Commands.LeaderHealthCheckCommand do
1111

1212
import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0]
1313

14+
def switches(), do: [global: :boolean]
15+
1416
def scopes(), do: [:queues]
1517

18+
def merge_defaults(args, opts) do
19+
{args, Map.merge(%{global: false, vhost: "/"}, opts)}
20+
end
21+
1622
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
1723
use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
18-
use RabbitMQ.CLI.Core.MergesDefaultVirtualHost
1924
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
2025

21-
def run([pattern] = _args, %{node: node_name, vhost: vhost}) do
26+
def run([pattern] = _args, %{node: node_name, vhost: vhost, global: global_opt}) do
27+
vhost = if global_opt, do: :global, else: vhost
28+
2229
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :leader_health_check, [pattern, vhost]) do
2330
[] ->
2431
:ok
2532

26-
unhealthy_queues ->
27-
{:error, unhealthy_queues}
33+
unhealthy_queues_or_error ->
34+
{:error, unhealthy_queues_or_error}
2835
end
2936
end
3037

@@ -67,12 +74,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.LeaderHealthCheckCommand do
6774
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
6875

6976
def usage() do
70-
"leader_health_check [--vhost <vhost>] <pattern>"
77+
"leader_health_check [--vhost <vhost>] [--global] <pattern>"
7178
end
7279

7380
def usage_additional do
7481
[
75-
["<pattern>", "regular expression pattern used to match quorum queues"]
82+
["<pattern>", "regular expression pattern used to match quorum queues"],
83+
["--global", "run leader health check for all queues in all virtual hosts on the node"]
7684
]
7785
end
7886

deps/rabbitmq_cli/test/queues/leader_health_check_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.LeaderHealthCheckCommandTest do
4646
{:error, {:badrpc, :nodedown}},
4747
@command.run(
4848
["quorum.queue.*"],
49-
%{node: :jake@thedog, vhost: "/", timeout: 200}
49+
%{node: :jake@thedog, vhost: "/", global: false, timeout: 200}
5050
)
5151
)
5252
end

0 commit comments

Comments
 (0)