Skip to content

Commit f091ba4

Browse files
Merge branch 'binarin-rabbitmq-server-818' into stable
2 parents 3448f97 + 73e2f61 commit f091ba4

File tree

3 files changed

+204
-21
lines changed

3 files changed

+204
-21
lines changed

include/rabbit_cli.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
3535
-define(QUIET_DEF, {?QUIET_OPT, flag}).
3636
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
37-
-define(TIMEOUT_DEF, {?TIMEOUT_OPT, {option, "infinity"}}).
37+
-define(TIMEOUT_DEF, {?TIMEOUT_OPT, {option, use_default}}).
3838

3939
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
4040
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).

src/rabbit_control_main.erl

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,15 @@
114114
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
115115
join_cluster, change_cluster_node_type, update_cluster_nodes,
116116
forget_cluster_node, rename_cluster_node, cluster_status, status,
117-
environment, eval, force_boot, help, node_health_check, hipe_compile]).
117+
environment, eval, force_boot, help, hipe_compile]).
118118

119+
%% [Command | {Command, DefaultTimeoutInMilliSeconds}]
119120
-define(COMMANDS_WITH_TIMEOUT,
120121
[list_user_permissions, list_policies, list_queues, list_exchanges,
121122
list_bindings, list_connections, list_channels, list_consumers,
122123
list_vhosts, list_parameters,
123-
purge_queue]).
124+
purge_queue,
125+
{node_health_check, 70000}]).
124126

125127
%%----------------------------------------------------------------------------
126128

@@ -152,7 +154,7 @@ start() ->
152154
end
153155
end,
154156
try
155-
T = case get_timeout(Opts) of
157+
T = case get_timeout(Command, Opts) of
156158
{ok, Timeout} ->
157159
Timeout;
158160
{error, _} ->
@@ -187,8 +189,23 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
187189
end,
188190
io:nl().
189191

190-
get_timeout(Opts) ->
191-
parse_timeout(proplists:get_value(?TIMEOUT_OPT, Opts, ?RPC_TIMEOUT)).
192+
get_timeout(Command, Opts) ->
193+
Default = case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of
194+
none ->
195+
infinity;
196+
{Command, true} ->
197+
?RPC_TIMEOUT;
198+
{Command, D} ->
199+
D
200+
end,
201+
Result = case proplists:get_value(?TIMEOUT_OPT, Opts, Default) of
202+
use_default ->
203+
parse_timeout(Default);
204+
Value ->
205+
parse_timeout(Value)
206+
end,
207+
Result.
208+
192209

193210
parse_number(N) when is_list(N) ->
194211
try list_to_integer(N) of
@@ -234,11 +251,11 @@ do_action(Command, Node, Args, Opts, Inform, Timeout) ->
234251
false ->
235252
case ensure_app_running(Node) of
236253
ok ->
237-
case lists:member(Command, ?COMMANDS_WITH_TIMEOUT) of
238-
true ->
254+
case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of
255+
{Command, _} ->
239256
announce_timeout(Timeout, Inform),
240257
action(Command, Node, Args, Opts, Inform, Timeout);
241-
false ->
258+
none ->
242259
action(Command, Node, Args, Opts, Inform)
243260
end;
244261
E -> E
@@ -562,17 +579,6 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
562579
action(help, _Node, _Args, _Opts, _Inform) ->
563580
io:format("~s", [rabbit_ctl_usage:usage()]);
564581

565-
action(node_health_check, Node, _Args, _Opts, Inform) ->
566-
Inform("Checking health of node ~p", [Node]),
567-
try
568-
rabbit_health_check:node(Node),
569-
io:format("Health check passed~n")
570-
catch
571-
{node_is_ko, ErrorMsg, ErrorCode} ->
572-
io:format("Heath check failed:~n~s~n", [ErrorMsg]),
573-
halt(ErrorCode)
574-
end;
575-
576582
action(Command, Node, Args, Opts, Inform) ->
577583
%% For backward compatibility, run commands accepting a timeout with
578584
%% the default timeout.
@@ -666,7 +672,17 @@ action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
666672
Inform("Listing consumers", []),
667673
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
668674
call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
669-
rabbit_amqqueue:consumer_info_keys(), Timeout).
675+
rabbit_amqqueue:consumer_info_keys(), Timeout);
676+
677+
action(node_health_check, Node, _Args, _Opts, Inform, Timeout) ->
678+
Inform("Checking health of node ~p", [Node]),
679+
case rabbit_health_check:node(Node, Timeout) of
680+
ok ->
681+
io:format("Health check passed~n"),
682+
ok;
683+
Other ->
684+
Other
685+
end.
670686

671687
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
672688

test/health_check_SUITE.erl

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
-module(health_check_SUITE).
17+
18+
-include_lib("common_test/include/ct.hrl").
19+
-include_lib("amqp_client/include/amqp_client.hrl").
20+
21+
-export([all/0
22+
,groups/0
23+
,init_per_suite/1
24+
,end_per_suite/1
25+
,init_per_testcase/2
26+
,end_per_testcase/2
27+
]).
28+
29+
-export([ignores_remote_dead_channel/1
30+
,detects_local_dead_channel/1
31+
,ignores_remote_dead_queue/1
32+
,detects_local_dead_queue/1
33+
,ignores_remote_alarms/1
34+
,detects_local_alarm/1
35+
,honors_timeout_argument/1
36+
]).
37+
38+
all() ->
39+
[{group, all_cases}].
40+
41+
groups() ->
42+
[{all_cases, [],
43+
[ignores_remote_dead_queue
44+
,detects_local_dead_queue
45+
,ignores_remote_dead_channel
46+
,detects_local_dead_channel
47+
,ignores_remote_alarms
48+
,detects_local_alarm
49+
,honors_timeout_argument
50+
]}].
51+
52+
init_per_suite(Config) ->
53+
rabbit_ct_helpers:log_environment(),
54+
rabbit_ct_helpers:run_setup_steps(Config).
55+
56+
end_per_suite(Config) ->
57+
rabbit_ct_helpers:run_teardown_steps(Config).
58+
59+
init_per_testcase(Testcase, Config0) ->
60+
rabbit_ct_helpers:testcase_started(Config0, Testcase),
61+
Config1 = rabbit_ct_helpers:set_config(
62+
Config0, [{rmq_nodes_count, 2},
63+
{rmq_nodes_clustered, true}]),
64+
rabbit_ct_helpers:run_steps(Config1,
65+
rabbit_ct_broker_helpers:setup_steps() ++
66+
rabbit_ct_client_helpers:setup_steps()).
67+
68+
end_per_testcase(Testcase, Config0) ->
69+
Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of
70+
undefined -> Config0;
71+
C -> C
72+
end,
73+
Config2 = rabbit_ct_helpers:run_steps(Config1,
74+
rabbit_ct_client_helpers:teardown_steps() ++
75+
rabbit_ct_broker_helpers:teardown_steps()),
76+
rabbit_ct_helpers:testcase_finished(Config2, Testcase).
77+
78+
%%----------------------------------------------------------------------------
79+
%% Test cases
80+
%%----------------------------------------------------------------------------
81+
ignores_remote_dead_channel(Config) ->
82+
[A, B] = open_channel_and_declare_queue_everywhere(Config),
83+
CPid = suspend_single_channel(Config, B),
84+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
85+
resume_sys_process(Config, B, CPid),
86+
ok.
87+
88+
detects_local_dead_channel(Config) ->
89+
[A|_] = open_channel_and_declare_queue_everywhere(Config),
90+
CPid = suspend_single_channel(Config, A),
91+
{error, 75, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
92+
{match, _} = re:run(Str, "operation node_health_check.*timed out"),
93+
resume_sys_process(Config, A, CPid),
94+
ok.
95+
96+
ignores_remote_dead_queue(Config) ->
97+
[A, B] = open_channel_and_declare_queue_everywhere(Config),
98+
QPid = suspend_single_queue(Config, B),
99+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
100+
resume_sys_process(Config, B, QPid),
101+
ok.
102+
103+
detects_local_dead_queue(Config) ->
104+
[A|_] = open_channel_and_declare_queue_everywhere(Config),
105+
QPid = suspend_single_queue(Config, A),
106+
{error, 75, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
107+
{match, _} = re:run(Str, "operation node_health_check.*timed out"),
108+
resume_sys_process(Config, A, QPid),
109+
ok.
110+
111+
ignores_remote_alarms(Config) ->
112+
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
113+
rabbit_ct_broker_helpers:rabbitmqctl(Config, B,
114+
["set_vm_memory_high_watermark", "0.000000001"]),
115+
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
116+
ok.
117+
118+
detects_local_alarm(Config) ->
119+
[A|_] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
120+
rabbit_ct_broker_helpers:rabbitmqctl(Config, A,
121+
["set_vm_memory_high_watermark", "0.000000001"]),
122+
{error, 70, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
123+
{match, _} = re:run(Str, "resource alarm.*in effect"),
124+
ok.
125+
126+
honors_timeout_argument(Config) ->
127+
[A|_] = open_channel_and_declare_queue_everywhere(Config),
128+
QPid = suspend_single_queue(Config, A),
129+
130+
case timer:tc(rabbit_ct_broker_helpers, rabbitmqctl, [Config, A, ["-t", "5", "node_health_check"]]) of
131+
{TimeSpent, {error, 75, _}} ->
132+
if TimeSpent < 5000000 -> exit({too_fast, TimeSpent});
133+
TimeSpent > 7000000 -> exit({too_slow, TimeSpent}); %% +2 seconds for rabbitmqctl overhead
134+
true -> ok
135+
end;
136+
{_, Unexpected} ->
137+
exit({unexpected, Unexpected})
138+
end,
139+
resume_sys_process(Config, A, QPid),
140+
ok.
141+
142+
%%----------------------------------------------------------------------------
143+
%% Helpers
144+
%%----------------------------------------------------------------------------
145+
open_channel_and_declare_queue_everywhere(Config) ->
146+
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
147+
lists:foreach(fun(Node) ->
148+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
149+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{})
150+
end,
151+
Nodes),
152+
Nodes.
153+
154+
suspend_single_queue(Config, Node) ->
155+
[QPid|_] = [rabbit_amqqueue:pid_of(Q) || Q <- rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, list, []),
156+
Node == node(rabbit_amqqueue:pid_of(Q))],
157+
rabbit_ct_broker_helpers:rpc(Config, Node, sys, suspend, [QPid]),
158+
QPid.
159+
160+
suspend_single_channel(Config, Node) ->
161+
[CPid|_] = [Pid || Pid <- rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_channel, list_local, []),
162+
Node == node(Pid)],
163+
rabbit_ct_broker_helpers:rpc(Config, Node, sys, suspend, [CPid]),
164+
CPid.
165+
166+
resume_sys_process(Config, Node, Pid) ->
167+
rabbit_ct_broker_helpers:rpc(Config, Node, sys, resume, [Pid]).

0 commit comments

Comments
 (0)