Skip to content

Commit 1475256

Browse files
michaelklishinlukebakken
authored andcommitted
Merge pull request #1816 from rabbitmq/rabbitmq-server-952
Add sysmon-handler to RabbitMQ (cherry picked from commit be27684)
1 parent 2be5a23 commit 1475256

File tree

5 files changed

+492
-2
lines changed

5 files changed

+492
-2
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ endef
133133

134134
LOCAL_DEPS = sasl mnesia os_mon inets
135135
BUILD_DEPS = rabbitmq_cli syslog
136-
DEPS = ranch lager rabbit_common
136+
DEPS = ranch lager rabbit_common sysmon_handler
137137
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
138138

139139
dep_syslog = git https://github.com/schlagert/syslog 3.4.5

priv/schema/rabbit.schema

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,103 @@ end}.
13521352
{validators, ["non_zero_positive_integer"]}
13531353
]}.
13541354

1355+
% ==========================
1356+
% sysmon_handler section
1357+
% ==========================
1358+
1359+
%% @doc The threshold at which to warn about the number of processes
1360+
%% that are overly busy. Processes with large heaps or that take a
1361+
%% long time to garbage collect will count toward this threshold.
1362+
{mapping, "sysmon_handler.thresholds.busy_processes", "sysmon_handler.process_limit", [
1363+
{default, 30},
1364+
{datatype, integer},
1365+
hidden
1366+
]}.
1367+
1368+
%% @doc The threshold at which to warn about the number of ports that
1369+
%% are overly busy. Ports with full input buffers count toward this
1370+
%% threshold.
1371+
{mapping, "sysmon_handler.thresholds.busy_ports", "sysmon_handler.port_limit", [
1372+
{default, 2},
1373+
{datatype, integer},
1374+
hidden
1375+
]}.
1376+
1377+
%% @doc A process will become busy when it exceeds this amount of time
1378+
%% doing garbage collection.
1379+
%%
1380+
%% NOTE: Enabling this setting can cause performance problems on
1381+
%% multi-core systems.
1382+
%% @see sysmon_handler.thresholds.busy_processes
1383+
{mapping, "sysmon_handler.triggers.process.garbage_collection", "sysmon_handler.gc_ms_limit", [
1384+
{default, off},
1385+
{datatype, [{atom, off},
1386+
{duration, ms}]},
1387+
hidden
1388+
]}.
1389+
1390+
{translation, "sysmon_handler.gc_ms_limit",
1391+
fun(Conf) ->
1392+
case cuttlefish:conf_get("sysmon_handler.triggers.process.garbage_collection", Conf) of
1393+
off -> 0;
1394+
Int -> Int
1395+
end
1396+
end}.
1397+
1398+
%% @doc A process will become busy when it exceeds this amount of time
1399+
%% during a single process scheduling & execution cycle.
1400+
{mapping, "sysmon_handler.triggers.process.long_scheduled_execution", "sysmon_handler.schedule_ms_limit", [
1401+
{default, off},
1402+
{datatype, [{atom, off},
1403+
{duration, ms}]},
1404+
hidden
1405+
]}.
1406+
1407+
{translation, "sysmon_handler.schedule_ms_limit",
1408+
fun(Conf) ->
1409+
case cuttlefish:conf_get("sysmon_handler.triggers.process.long_scheduled_execution", Conf) of
1410+
off -> 0;
1411+
Int -> Int
1412+
end
1413+
end}.
1414+
1415+
%% @doc A process will become busy when its heap exceeds this size.
1416+
%% @see sysmon_handler.thresholds.busy_processes
1417+
{mapping, "sysmon_handler.triggers.process.heap_size", "sysmon_handler.heap_word_limit", [
1418+
{default, "160444000"},
1419+
{datatype, [bytesize, {atom, off}]},
1420+
hidden
1421+
]}.
1422+
1423+
{translation, "sysmon_handler.heap_word_limit",
1424+
fun(Conf) ->
1425+
case cuttlefish:conf_get("sysmon_handler.triggers.process.heap_size", Conf) of
1426+
off -> 0;
1427+
Bytes ->
1428+
WordSize = erlang:system_info(wordsize),
1429+
Bytes div WordSize
1430+
end
1431+
end}.
1432+
1433+
%% @doc Whether ports with full input buffers will be counted as
1434+
%% busy. Ports can represent open files or network sockets.
1435+
%% @see sysmon_handler.thresholds.busy_ports
1436+
{mapping, "sysmon_handler.triggers.port", "sysmon_handler.busy_port", [
1437+
{default, on},
1438+
{datatype, flag},
1439+
hidden
1440+
]}.
1441+
1442+
%% @doc Whether distribution ports with full input buffers will be
1443+
%% counted as busy. Distribution ports connect Erlang nodes within a
1444+
%% single cluster.
1445+
%% @see sysmon_handler.thresholds.busy_ports
1446+
{mapping, "sysmon_handler.triggers.distribution_port", "sysmon_handler.busy_dist_port", [
1447+
{default, on},
1448+
{datatype, flag},
1449+
hidden
1450+
]}.
1451+
13551452
% ===============================
13561453
% Validators
13571454
% ===============================

src/rabbit.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@
156156
{requires, kernel_ready},
157157
{enables, core_initialized}]}).
158158

159+
-rabbit_boot_step({rabbit_sysmon_minder,
160+
[{description, "sysmon_handler supervisor"},
161+
{mfa, {rabbit_sup, start_restartable_child,
162+
[rabbit_sysmon_minder]}},
163+
{requires, kernel_ready},
164+
{enables, core_initialized}]}).
165+
159166
-rabbit_boot_step({core_initialized,
160167
[{description, "core initialized"},
161168
{requires, kernel_ready}]}).
@@ -225,7 +232,7 @@
225232
-include("rabbit_framing.hrl").
226233
-include("rabbit.hrl").
227234

228-
-define(APPS, [os_mon, mnesia, rabbit_common, rabbit]).
235+
-define(APPS, [os_mon, mnesia, rabbit_common, sysmon_handler, rabbit]).
229236

230237
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
231238

src/rabbit_sysmon_handler.erl

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
2+
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
3+
%%
4+
%% This file is provided to you under the Apache License,
5+
%% Version 2.0 (the "License"); you may not use this file
6+
%% except in compliance with the License. You may obtain
7+
%% a copy of the License at
8+
%%
9+
%% http://www.apache.org/licenses/LICENSE-2.0
10+
%%
11+
%% Unless required by applicable law or agreed to in writing,
12+
%% software distributed under the License is distributed on an
13+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
%% KIND, either express or implied. See the License for the
15+
%% specific language governing permissions and limitations
16+
%% under the License.
17+
18+
%% @doc A custom event handler to the `sysmon_handler' application's
19+
%% `system_monitor' event manager.
20+
%%
21+
%% This module attempts to discover more information about a process
22+
%% that generates a system_monitor event.
23+
24+
-module(rabbit_sysmon_handler).
25+
26+
-behaviour(gen_event).
27+
28+
%% API
29+
-export([add_handler/0]).
30+
31+
%% gen_event callbacks
32+
-export([init/1, handle_event/2, handle_call/2,
33+
handle_info/2, terminate/2, code_change/3]).
34+
35+
-record(state, {timer_ref :: reference()}).
36+
37+
-define(INACTIVITY_TIMEOUT, 5000).
38+
39+
%%%===================================================================
40+
%%% gen_event callbacks
41+
%%%===================================================================
42+
43+
add_handler() ->
44+
%% Vulnerable to race conditions (installing handler multiple
45+
%% times), but risk is zero in the common OTP app startup case.
46+
case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
47+
true ->
48+
ok;
49+
false ->
50+
sysmon_handler_filter:add_custom_handler(?MODULE, [])
51+
end.
52+
53+
%%%===================================================================
54+
%%% gen_event callbacks
55+
%%%===================================================================
56+
57+
%%--------------------------------------------------------------------
58+
%% @private
59+
%% @doc
60+
%% Whenever a new event handler is added to an event manager,
61+
%% this function is called to initialize the event handler.
62+
%%
63+
%% @spec init(Args) -> {ok, State}
64+
%% @end
65+
%%--------------------------------------------------------------------
66+
init([]) ->
67+
{ok, #state{}, hibernate}.
68+
69+
%%--------------------------------------------------------------------
70+
%% @private
71+
%% @doc
72+
%% Whenever an event manager receives an event sent using
73+
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
74+
%% called for each installed event handler to handle the event.
75+
%%
76+
%% @spec handle_event(Event, State) ->
77+
%% {ok, State} |
78+
%% {swap_handler, Args1, State1, Mod2, Args2} |
79+
%% remove_handler
80+
%% @end
81+
%%--------------------------------------------------------------------
82+
handle_event({monitor, Pid, Type, _Info},
83+
State=#state{timer_ref=TimerRef}) when Pid == self() ->
84+
%% Reset the inactivity timeout
85+
NewTimerRef = reset_timer(TimerRef),
86+
maybe_collect_garbage(Type),
87+
{ok, State#state{timer_ref=NewTimerRef}};
88+
handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
89+
%% Reset the inactivity timeout
90+
NewTimerRef = reset_timer(TimerRef),
91+
{Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
92+
rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
93+
{ok, State#state{timer_ref=NewTimerRef}};
94+
handle_event(Event, State=#state{timer_ref=TimerRef}) ->
95+
NewTimerRef = reset_timer(TimerRef),
96+
rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
97+
{ok, State#state{timer_ref=NewTimerRef}}.
98+
99+
%%--------------------------------------------------------------------
100+
%% @private
101+
%% @doc
102+
%% Whenever an event manager receives a request sent using
103+
%% gen_event:call/3,4, this function is called for the specified
104+
%% event handler to handle the request.
105+
%%
106+
%% @spec handle_call(Request, State) ->
107+
%% {ok, Reply, State} |
108+
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
109+
%% {remove_handler, Reply}
110+
%% @end
111+
%%--------------------------------------------------------------------
112+
handle_call(_Call, State) ->
113+
Reply = not_supported,
114+
{ok, Reply, State}.
115+
116+
%%--------------------------------------------------------------------
117+
%% @private
118+
%% @doc
119+
%% This function is called for each installed event handler when
120+
%% an event manager receives any other message than an event or a
121+
%% synchronous request (or a system message).
122+
%%
123+
%% @spec handle_info(Info, State) ->
124+
%% {ok, State} |
125+
%% {swap_handler, Args1, State1, Mod2, Args2} |
126+
%% remove_handler
127+
%% @end
128+
%%--------------------------------------------------------------------
129+
handle_info(inactivity_timeout, State) ->
130+
%% No events have arrived for the timeout period
131+
%% so hibernate to free up resources.
132+
{ok, State, hibernate};
133+
handle_info(Info, State) ->
134+
rabbit_log:info("handle_info got ~p", [Info]),
135+
{ok, State}.
136+
137+
%%--------------------------------------------------------------------
138+
%% @private
139+
%% @doc
140+
%% Whenever an event handler is deleted from an event manager, this
141+
%% function is called. It should be the opposite of Module:init/1 and
142+
%% do any necessary cleaning up.
143+
%%
144+
%% @spec terminate(Reason, State) -> void()
145+
%% @end
146+
%%--------------------------------------------------------------------
147+
terminate(_Reason, _State) ->
148+
ok.
149+
150+
%%--------------------------------------------------------------------
151+
%% @private
152+
%% @doc
153+
%% Convert process state when code is changed
154+
%%
155+
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
156+
%% @end
157+
%%--------------------------------------------------------------------
158+
code_change(_OldVsn, State, _Extra) ->
159+
{ok, State}.
160+
161+
%%%===================================================================
162+
%%% Internal functions
163+
%%%===================================================================
164+
165+
format_pretty_proc_or_port_info(PidOrPort) ->
166+
try
167+
case get_pretty_proc_or_port_info(PidOrPort) of
168+
undefined ->
169+
{"", []};
170+
Res ->
171+
Res
172+
end
173+
catch C:E:S ->
174+
{"Pid ~w, ~W ~W at ~w\n",
175+
[PidOrPort, C, 20, E, 20, S]}
176+
end.
177+
178+
get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
179+
Infos = [registered_name, initial_call, current_function, message_queue_len],
180+
case process_info(Pid, Infos) of
181+
undefined ->
182+
undefined;
183+
[] ->
184+
undefined;
185+
[{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
186+
ICT = case proc_lib:translate_initial_call(Pid) of
187+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
188+
ICT1;
189+
ICT2 ->
190+
{initial_call, ICT2}
191+
end,
192+
RNL = if RN0 == [] -> [];
193+
true -> [{name, RN0}]
194+
end,
195+
{"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
196+
end;
197+
get_pretty_proc_or_port_info(Port) when is_port(Port) ->
198+
PortInfo = erlang:port_info(Port),
199+
{value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
200+
QueueSize = [erlang:port_info(Port, queue_size)],
201+
Connected = case proplists:get_value(connected, PortInfo2) of
202+
undefined ->
203+
[];
204+
ConnectedPid ->
205+
case proc_lib:translate_initial_call(ConnectedPid) of
206+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
207+
[];
208+
ICT ->
209+
[{initial_call, ICT}]
210+
end
211+
end,
212+
{"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
213+
214+
215+
%% @doc If the message type is due to a large heap warning
216+
%% and the source is ourself, go ahead and collect garbage
217+
%% to avoid the death spiral.
218+
-spec maybe_collect_garbage(atom()) -> ok.
219+
maybe_collect_garbage(large_heap) ->
220+
erlang:garbage_collect(),
221+
ok;
222+
maybe_collect_garbage(_) ->
223+
ok.
224+
225+
-spec reset_timer(undefined | reference()) -> reference().
226+
reset_timer(undefined) ->
227+
erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
228+
reset_timer(TimerRef) ->
229+
_ = erlang:cancel_timer(TimerRef),
230+
reset_timer(undefined).

0 commit comments

Comments
 (0)