Skip to content

Commit 1128b6b

Browse files
committed
Add rabbitmq-sysmon to RabbitMQ
Fixes #952
1 parent be0d287 commit 1128b6b

File tree

5 files changed

+407
-2
lines changed

5 files changed

+407
-2
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ endef
138138

139139
LOCAL_DEPS = sasl mnesia os_mon inets
140140
BUILD_DEPS = rabbitmq_cli syslog
141-
DEPS = ranch lager rabbit_common ra
141+
DEPS = ranch lager rabbit_common ra rabbit_sysmon
142142
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
143143

144144
dep_syslog = git https://github.com/schlagert/syslog 3.4.5

rabbitmq-components.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ dep_cowlib = hex 2.7.0
113113
dep_jsx = hex 2.9.0
114114
dep_lager = hex 3.6.5
115115
dep_ra = git https://github.com/rabbitmq/ra.git master
116+
dep_rabbit_sysmon = git https://github.com/rabbitmq/rabbitmq-sysmon.git master
116117
dep_ranch = hex 1.7.1
117118
dep_recon = hex 2.3.6
118119

src/rabbit.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@
156156
{requires, kernel_ready},
157157
{enables, core_initialized}]}).
158158

159+
-rabbit_boot_step({sysmon_minder,
160+
[{description, "sysmon_handler supervisor"},
161+
{mfa, {rabbit_sup, start_restartable_child,
162+
[sysmon_minder]}},
163+
{requires, kernel_ready},
164+
{enables, core_initialized}]}).
165+
159166
-rabbit_boot_step({core_initialized,
160167
[{description, "core initialized"},
161168
{requires, kernel_ready}]}).
@@ -225,7 +232,7 @@
225232
-include("rabbit_framing.hrl").
226233
-include("rabbit.hrl").
227234

228-
-define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
235+
-define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit_sysmon, rabbit]).
229236

230237
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
231238

src/sysmon_handler.erl

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
2+
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
3+
%%
4+
%% This file is provided to you under the Apache License,
5+
%% Version 2.0 (the "License"); you may not use this file
6+
%% except in compliance with the License. You may obtain
7+
%% a copy of the License at
8+
%%
9+
%% http://www.apache.org/licenses/LICENSE-2.0
10+
%%
11+
%% Unless required by applicable law or agreed to in writing,
12+
%% software distributed under the License is distributed on an
13+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
%% KIND, either express or implied. See the License for the
15+
%% specific language governing permissions and limitations
16+
%% under the License.
17+
18+
%% @doc A custom event handler to the `rabbit_sysmon' application's
19+
%% `system_monitor' event manager.
20+
%%
21+
%% This module attempts to discover more information about a process
22+
%% that generates a system_monitor event.
23+
24+
-module(sysmon_handler).
25+
26+
-behaviour(gen_event).
27+
28+
%% API
29+
-export([add_handler/0]).
30+
31+
%% gen_event callbacks
32+
-export([init/1, handle_event/2, handle_call/2,
33+
handle_info/2, terminate/2, code_change/3]).
34+
35+
-record(state, {timer_ref :: reference()}).
36+
37+
-define(INACTIVITY_TIMEOUT, 5000).
38+
39+
%%%===================================================================
40+
%%% gen_event callbacks
41+
%%%===================================================================
42+
43+
add_handler() ->
44+
%% Vulnerable to race conditions (installing handler multiple
45+
%% times), but risk is zero in the common OTP app startup case.
46+
case lists:member(?MODULE,
47+
gen_event:which_handlers(rabbit_sysmon_handler)) of
48+
true ->
49+
ok;
50+
false ->
51+
rabbit_sysmon_filter:add_custom_handler(?MODULE, [])
52+
end.
53+
54+
%%%===================================================================
55+
%%% gen_event callbacks
56+
%%%===================================================================
57+
58+
%%--------------------------------------------------------------------
59+
%% @private
60+
%% @doc
61+
%% Whenever a new event handler is added to an event manager,
62+
%% this function is called to initialize the event handler.
63+
%%
64+
%% @spec init(Args) -> {ok, State}
65+
%% @end
66+
%%--------------------------------------------------------------------
67+
init([]) ->
68+
{ok, #state{}, hibernate}.
69+
70+
%%--------------------------------------------------------------------
71+
%% @private
72+
%% @doc
73+
%% Whenever an event manager receives an event sent using
74+
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
75+
%% called for each installed event handler to handle the event.
76+
%%
77+
%% @spec handle_event(Event, State) ->
78+
%% {ok, State} |
79+
%% {swap_handler, Args1, State1, Mod2, Args2} |
80+
%% remove_handler
81+
%% @end
82+
%%--------------------------------------------------------------------
83+
handle_event({monitor, Pid, Type, _Info},
84+
State=#state{timer_ref=TimerRef}) when Pid == self() ->
85+
%% Reset the inactivity timeout
86+
NewTimerRef = reset_timer(TimerRef),
87+
maybe_collect_garbage(Type),
88+
{ok, State#state{timer_ref=NewTimerRef}};
89+
handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
90+
%% Reset the inactivity timeout
91+
NewTimerRef = reset_timer(TimerRef),
92+
{Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort, almost_current_function),
93+
rabbit_log:info("monitor ~w ~w " ++ Fmt ++ " ~w", [Type, PidOrPort] ++ Args ++ [Info]),
94+
{ok, State#state{timer_ref=NewTimerRef}};
95+
handle_event(Event, State=#state{timer_ref=TimerRef}) ->
96+
NewTimerRef = reset_timer(TimerRef),
97+
rabbit_log:info("Monitor got ~p", [Event]),
98+
{ok, State#state{timer_ref=NewTimerRef}}.
99+
100+
%%--------------------------------------------------------------------
101+
%% @private
102+
%% @doc
103+
%% Whenever an event manager receives a request sent using
104+
%% gen_event:call/3,4, this function is called for the specified
105+
%% event handler to handle the request.
106+
%%
107+
%% @spec handle_call(Request, State) ->
108+
%% {ok, Reply, State} |
109+
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
110+
%% {remove_handler, Reply}
111+
%% @end
112+
%%--------------------------------------------------------------------
113+
handle_call(_Call, State) ->
114+
Reply = not_supported,
115+
{ok, Reply, State}.
116+
117+
%%--------------------------------------------------------------------
118+
%% @private
119+
%% @doc
120+
%% This function is called for each installed event handler when
121+
%% an event manager receives any other message than an event or a
122+
%% synchronous request (or a system message).
123+
%%
124+
%% @spec handle_info(Info, State) ->
125+
%% {ok, State} |
126+
%% {swap_handler, Args1, State1, Mod2, Args2} |
127+
%% remove_handler
128+
%% @end
129+
%%--------------------------------------------------------------------
130+
handle_info(inactivity_timeout, State) ->
131+
%% No events have arrived for the timeout period
132+
%% so hibernate to free up resources.
133+
{ok, State, hibernate};
134+
handle_info(Info, State) ->
135+
rabbit_log:info("handle_info got ~p", [Info]),
136+
{ok, State}.
137+
138+
%%--------------------------------------------------------------------
139+
%% @private
140+
%% @doc
141+
%% Whenever an event handler is deleted from an event manager, this
142+
%% function is called. It should be the opposite of Module:init/1 and
143+
%% do any necessary cleaning up.
144+
%%
145+
%% @spec terminate(Reason, State) -> void()
146+
%% @end
147+
%%--------------------------------------------------------------------
148+
terminate(_Reason, _State) ->
149+
ok.
150+
151+
%%--------------------------------------------------------------------
152+
%% @private
153+
%% @doc
154+
%% Convert process state when code is changed
155+
%%
156+
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
157+
%% @end
158+
%%--------------------------------------------------------------------
159+
code_change(_OldVsn, State, _Extra) ->
160+
{ok, State}.
161+
162+
%%%===================================================================
163+
%%% Internal functions
164+
%%%===================================================================
165+
166+
%% Enabling warnings_as_errors prevents a build since this function is
167+
%% dead code. To be safe, commenting out rather than deleting.
168+
%% format_pretty_proc_info(Pid) ->
169+
%% format_pretty_proc_info(Pid, current_function).
170+
171+
format_pretty_proc_or_port_info(PidOrPort, Acf) ->
172+
try
173+
case get_pretty_proc_or_port_info(PidOrPort, Acf) of
174+
undefined ->
175+
{"", []};
176+
Res ->
177+
Res
178+
end
179+
catch C:E:S ->
180+
{"Pid ~w, ~W ~W at ~w\n",
181+
[PidOrPort, C, 20, E, 20, S]}
182+
end.
183+
184+
%% Enabling warnings_as_errors prevents a build since this function is
185+
%% dead code. To be safe, commenting out rather than deleting.
186+
%% get_pretty_proc_info(Pid) ->
187+
%% get_pretty_proc_info(Pid, current_function).
188+
189+
get_pretty_proc_or_port_info(Pid, Acf) when is_pid(Pid) ->
190+
case process_info(Pid, [registered_name, initial_call, current_function,
191+
message_queue_len]) of
192+
undefined ->
193+
undefined;
194+
[] ->
195+
undefined;
196+
[{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
197+
ICT = case proc_lib:translate_initial_call(Pid) of
198+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
199+
ICT1;
200+
ICT2 ->
201+
{initial_call, ICT2}
202+
end,
203+
RNL = if RN0 == [] -> [];
204+
true -> [{name, RN0}]
205+
end,
206+
{"~w", [RNL ++ [ICT, {Acf, CF}, {message_queue_len, MQL}]]}
207+
end;
208+
get_pretty_proc_or_port_info(Port, _Acf) when is_port(Port) ->
209+
PortInfo = erlang:port_info(Port),
210+
{value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
211+
QueueSize = [erlang:port_info(Port, queue_size)],
212+
Connected = case proplists:get_value(connected, PortInfo2) of
213+
undefined ->
214+
[];
215+
ConnectedPid ->
216+
case proc_lib:translate_initial_call(ConnectedPid) of
217+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
218+
[];
219+
ICT ->
220+
[{initial_call, ICT}]
221+
end
222+
end,
223+
{"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
224+
225+
226+
%% @doc If the message type is due to a large heap warning
227+
%% and the source is ourself, go ahead and collect garbage
228+
%% to avoid the death spiral.
229+
-spec maybe_collect_garbage(atom()) -> ok.
230+
maybe_collect_garbage(large_heap) ->
231+
erlang:garbage_collect(),
232+
ok;
233+
maybe_collect_garbage(_) ->
234+
ok.
235+
236+
-spec reset_timer(undefined | reference()) -> reference().
237+
reset_timer(undefined) ->
238+
erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
239+
reset_timer(TimerRef) ->
240+
_ = erlang:cancel_timer(TimerRef),
241+
reset_timer(undefined).

0 commit comments

Comments
 (0)