Skip to content

Commit 9f3ace8

Browse files
committed
Add rabbitmq-sysmon to RabbitMQ
Fixes #952 Update for rabbitmq-sysmon -> sysmon-handler rename
1 parent 67af90f commit 9f3ace8

File tree

5 files changed

+396
-2
lines changed

5 files changed

+396
-2
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ endef
138138

139139
LOCAL_DEPS = sasl mnesia os_mon inets
140140
BUILD_DEPS = rabbitmq_cli syslog
141-
DEPS = ranch lager rabbit_common ra
141+
DEPS = ranch lager rabbit_common ra sysmon_handler
142142
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
143143

144144
dep_syslog = git https://github.com/schlagert/syslog 3.4.5

rabbitmq-components.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ dep_cowlib = hex 2.7.0
113113
dep_jsx = hex 2.9.0
114114
dep_lager = hex 3.6.5
115115
dep_ra = git https://github.com/rabbitmq/ra.git master
116+
dep_sysmon_handler = git https://github.com/rabbitmq/sysmon-handler.git master
116117
dep_ranch = hex 1.7.1
117118
dep_recon = hex 2.3.6
118119

src/rabbit.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@
156156
{requires, kernel_ready},
157157
{enables, core_initialized}]}).
158158

159+
-rabbit_boot_step({rabbit_sysmon_minder,
160+
[{description, "sysmon_handler supervisor"},
161+
{mfa, {rabbit_sup, start_restartable_child,
162+
[rabbit_sysmon_minder]}},
163+
{requires, kernel_ready},
164+
{enables, core_initialized}]}).
165+
159166
-rabbit_boot_step({core_initialized,
160167
[{description, "core initialized"},
161168
{requires, kernel_ready}]}).
@@ -225,7 +232,7 @@
225232
-include("rabbit_framing.hrl").
226233
-include("rabbit.hrl").
227234

228-
-define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
235+
-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]).
229236

230237
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
231238

src/rabbit_sysmon_handler.erl

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
2+
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
3+
%%
4+
%% This file is provided to you under the Apache License,
5+
%% Version 2.0 (the "License"); you may not use this file
6+
%% except in compliance with the License. You may obtain
7+
%% a copy of the License at
8+
%%
9+
%% http://www.apache.org/licenses/LICENSE-2.0
10+
%%
11+
%% Unless required by applicable law or agreed to in writing,
12+
%% software distributed under the License is distributed on an
13+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
%% KIND, either express or implied. See the License for the
15+
%% specific language governing permissions and limitations
16+
%% under the License.
17+
18+
%% @doc A custom event handler to the `sysmon_handler' application's
19+
%% `system_monitor' event manager.
20+
%%
21+
%% This module attempts to discover more information about a process
22+
%% that generates a system_monitor event.
23+
24+
-module(rabbit_sysmon_handler).
25+
26+
-behaviour(gen_event).
27+
28+
%% API
29+
-export([add_handler/0]).
30+
31+
%% gen_event callbacks
32+
-export([init/1, handle_event/2, handle_call/2,
33+
handle_info/2, terminate/2, code_change/3]).
34+
35+
-record(state, {timer_ref :: reference()}).
36+
37+
-define(INACTIVITY_TIMEOUT, 5000).
38+
39+
%%%===================================================================
40+
%%% gen_event callbacks
41+
%%%===================================================================
42+
43+
add_handler() ->
44+
%% Vulnerable to race conditions (installing handler multiple
45+
%% times), but risk is zero in the common OTP app startup case.
46+
case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
47+
true ->
48+
ok;
49+
false ->
50+
sysmon_handler_filter:add_custom_handler(?MODULE, [])
51+
end.
52+
53+
%%%===================================================================
54+
%%% gen_event callbacks
55+
%%%===================================================================
56+
57+
%%--------------------------------------------------------------------
58+
%% @private
59+
%% @doc
60+
%% Whenever a new event handler is added to an event manager,
61+
%% this function is called to initialize the event handler.
62+
%%
63+
%% @spec init(Args) -> {ok, State}
64+
%% @end
65+
%%--------------------------------------------------------------------
66+
init([]) ->
67+
{ok, #state{}, hibernate}.
68+
69+
%%--------------------------------------------------------------------
70+
%% @private
71+
%% @doc
72+
%% Whenever an event manager receives an event sent using
73+
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
74+
%% called for each installed event handler to handle the event.
75+
%%
76+
%% @spec handle_event(Event, State) ->
77+
%% {ok, State} |
78+
%% {swap_handler, Args1, State1, Mod2, Args2} |
79+
%% remove_handler
80+
%% @end
81+
%%--------------------------------------------------------------------
82+
handle_event({monitor, Pid, Type, _Info},
83+
State=#state{timer_ref=TimerRef}) when Pid == self() ->
84+
%% Reset the inactivity timeout
85+
NewTimerRef = reset_timer(TimerRef),
86+
maybe_collect_garbage(Type),
87+
{ok, State#state{timer_ref=NewTimerRef}};
88+
handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
89+
%% Reset the inactivity timeout
90+
NewTimerRef = reset_timer(TimerRef),
91+
{Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
92+
rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
93+
{ok, State#state{timer_ref=NewTimerRef}};
94+
handle_event(Event, State=#state{timer_ref=TimerRef}) ->
95+
NewTimerRef = reset_timer(TimerRef),
96+
rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
97+
{ok, State#state{timer_ref=NewTimerRef}}.
98+
99+
%%--------------------------------------------------------------------
100+
%% @private
101+
%% @doc
102+
%% Whenever an event manager receives a request sent using
103+
%% gen_event:call/3,4, this function is called for the specified
104+
%% event handler to handle the request.
105+
%%
106+
%% @spec handle_call(Request, State) ->
107+
%% {ok, Reply, State} |
108+
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
109+
%% {remove_handler, Reply}
110+
%% @end
111+
%%--------------------------------------------------------------------
112+
handle_call(_Call, State) ->
113+
Reply = not_supported,
114+
{ok, Reply, State}.
115+
116+
%%--------------------------------------------------------------------
117+
%% @private
118+
%% @doc
119+
%% This function is called for each installed event handler when
120+
%% an event manager receives any other message than an event or a
121+
%% synchronous request (or a system message).
122+
%%
123+
%% @spec handle_info(Info, State) ->
124+
%% {ok, State} |
125+
%% {swap_handler, Args1, State1, Mod2, Args2} |
126+
%% remove_handler
127+
%% @end
128+
%%--------------------------------------------------------------------
129+
handle_info(inactivity_timeout, State) ->
130+
%% No events have arrived for the timeout period
131+
%% so hibernate to free up resources.
132+
{ok, State, hibernate};
133+
handle_info(Info, State) ->
134+
rabbit_log:info("handle_info got ~p", [Info]),
135+
{ok, State}.
136+
137+
%%--------------------------------------------------------------------
138+
%% @private
139+
%% @doc
140+
%% Whenever an event handler is deleted from an event manager, this
141+
%% function is called. It should be the opposite of Module:init/1 and
142+
%% do any necessary cleaning up.
143+
%%
144+
%% @spec terminate(Reason, State) -> void()
145+
%% @end
146+
%%--------------------------------------------------------------------
147+
terminate(_Reason, _State) ->
148+
ok.
149+
150+
%%--------------------------------------------------------------------
151+
%% @private
152+
%% @doc
153+
%% Convert process state when code is changed
154+
%%
155+
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
156+
%% @end
157+
%%--------------------------------------------------------------------
158+
code_change(_OldVsn, State, _Extra) ->
159+
{ok, State}.
160+
161+
%%%===================================================================
162+
%%% Internal functions
163+
%%%===================================================================
164+
165+
format_pretty_proc_or_port_info(PidOrPort) ->
166+
try
167+
case get_pretty_proc_or_port_info(PidOrPort) of
168+
undefined ->
169+
{"", []};
170+
Res ->
171+
Res
172+
end
173+
catch C:E:S ->
174+
{"Pid ~w, ~W ~W at ~w\n",
175+
[PidOrPort, C, 20, E, 20, S]}
176+
end.
177+
178+
get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
179+
Infos = [registered_name, initial_call, current_function, message_queue_len],
180+
case process_info(Pid, Infos) of
181+
undefined ->
182+
undefined;
183+
[] ->
184+
undefined;
185+
[{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
186+
ICT = case proc_lib:translate_initial_call(Pid) of
187+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
188+
ICT1;
189+
ICT2 ->
190+
{initial_call, ICT2}
191+
end,
192+
RNL = if RN0 == [] -> [];
193+
true -> [{name, RN0}]
194+
end,
195+
{"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
196+
end;
197+
get_pretty_proc_or_port_info(Port) when is_port(Port) ->
198+
PortInfo = erlang:port_info(Port),
199+
{value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
200+
QueueSize = [erlang:port_info(Port, queue_size)],
201+
Connected = case proplists:get_value(connected, PortInfo2) of
202+
undefined ->
203+
[];
204+
ConnectedPid ->
205+
case proc_lib:translate_initial_call(ConnectedPid) of
206+
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
207+
[];
208+
ICT ->
209+
[{initial_call, ICT}]
210+
end
211+
end,
212+
{"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
213+
214+
215+
%% @doc If the message type is due to a large heap warning
216+
%% and the source is ourself, go ahead and collect garbage
217+
%% to avoid the death spiral.
218+
-spec maybe_collect_garbage(atom()) -> ok.
219+
maybe_collect_garbage(large_heap) ->
220+
erlang:garbage_collect(),
221+
ok;
222+
maybe_collect_garbage(_) ->
223+
ok.
224+
225+
-spec reset_timer(undefined | reference()) -> reference().
226+
reset_timer(undefined) ->
227+
erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
228+
reset_timer(TimerRef) ->
229+
_ = erlang:cancel_timer(TimerRef),
230+
reset_timer(undefined).

0 commit comments

Comments
 (0)