Skip to content

Add sysmon-handler to RabbitMQ #1816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ endef

LOCAL_DEPS = sasl mnesia os_mon inets
BUILD_DEPS = rabbitmq_cli syslog
DEPS = ranch lager rabbit_common ra
DEPS = ranch lager rabbit_common ra sysmon_handler
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper

dep_syslog = git https://github.com/schlagert/syslog 3.4.5
Expand Down
97 changes: 97 additions & 0 deletions priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,103 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.

% ==========================
% sysmon_handler section
% ==========================

%% @doc The threshold at which to warn about the number of processes
%% that are overly busy. Processes with large heaps or that take a
%% long time to garbage collect will count toward this threshold.
{mapping, "sysmon_handler.thresholds.busy_processes", "sysmon_handler.process_limit", [
{default, 30},
{datatype, integer},
hidden
]}.

%% @doc The threshold at which to warn about the number of ports that
%% are overly busy. Ports with full input buffers count toward this
%% threshold.
{mapping, "sysmon_handler.thresholds.busy_ports", "sysmon_handler.port_limit", [
{default, 2},
{datatype, integer},
hidden
]}.

%% @doc A process will become busy when it exceeds this amount of time
%% doing garbage collection.
%%
%% NOTE: Enabling this setting can cause performance problems on
%% multi-core systems.
%% @see sysmon_handler.thresholds.busy_processes
{mapping, "sysmon_handler.triggers.process.garbage_collection", "sysmon_handler.gc_ms_limit", [
{default, off},
{datatype, [{atom, off},
{duration, ms}]},
hidden
]}.

{translation, "sysmon_handler.gc_ms_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.garbage_collection", Conf) of
off -> 0;
Int -> Int
end
end}.

%% @doc A process will become busy when it exceeds this amount of time
%% during a single process scheduling & execution cycle.
{mapping, "sysmon_handler.triggers.process.long_scheduled_execution", "sysmon_handler.schedule_ms_limit", [
{default, off},
{datatype, [{atom, off},
{duration, ms}]},
hidden
]}.

{translation, "sysmon_handler.schedule_ms_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.long_scheduled_execution", Conf) of
off -> 0;
Int -> Int
end
end}.

%% @doc A process will become busy when its heap exceeds this size.
%% @see sysmon_handler.thresholds.busy_processes
{mapping, "sysmon_handler.triggers.process.heap_size", "sysmon_handler.heap_word_limit", [
{default, "160444000"},
{datatype, [bytesize, {atom, off}]},
hidden
]}.

{translation, "sysmon_handler.heap_word_limit",
fun(Conf) ->
case cuttlefish:conf_get("sysmon_handler.triggers.process.heap_size", Conf) of
off -> 0;
Bytes ->
WordSize = erlang:system_info(wordsize),
Bytes div WordSize
end
end}.

%% @doc Whether ports with full input buffers will be counted as
%% busy. Ports can represent open files or network sockets.
%% @see sysmon_handler.thresholds.busy_ports
{mapping, "sysmon_handler.triggers.port", "sysmon_handler.busy_port", [
{default, on},
{datatype, flag},
hidden
]}.

%% @doc Whether distribution ports with full input buffers will be
%% counted as busy. Distribution ports connect Erlang nodes within a
%% single cluster.
%% @see sysmon_handler.thresholds.busy_ports
{mapping, "sysmon_handler.triggers.distribution_port", "sysmon_handler.busy_dist_port", [
{default, on},
{datatype, flag},
hidden
]}.

% ===============================
% Validators
% ===============================
Expand Down
1 change: 1 addition & 0 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ dep_lager = hex 3.6.5
dep_ra = git https://github.com/rabbitmq/ra.git master
dep_ranch = hex 1.7.1
dep_recon = hex 2.3.6
dep_sysmon_handler = hex 1.0.0

RABBITMQ_COMPONENTS = amqp_client \
amqp10_common \
Expand Down
9 changes: 8 additions & 1 deletion src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).

-rabbit_boot_step({rabbit_sysmon_minder,
[{description, "sysmon_handler supervisor"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_sysmon_minder]}},
{requires, kernel_ready},
{enables, core_initialized}]}).

-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
Expand Down Expand Up @@ -225,7 +232,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").

-define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]).

-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).

Expand Down
230 changes: 230 additions & 0 deletions src/rabbit_sysmon_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.

%% @doc A custom event handler to the `sysmon_handler' application's
%% `system_monitor' event manager.
%%
%% This module attempts to discover more information about a process
%% that generates a system_monitor event.

-module(rabbit_sysmon_handler).

-behaviour(gen_event).

%% API
-export([add_handler/0]).

%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).

-record(state, {timer_ref :: reference()}).

-define(INACTIVITY_TIMEOUT, 5000).

%%%===================================================================
%%% gen_event callbacks
%%%===================================================================

add_handler() ->
%% Vulnerable to race conditions (installing handler multiple
%% times), but risk is zero in the common OTP app startup case.
case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
true ->
ok;
false ->
sysmon_handler_filter:add_custom_handler(?MODULE, [])
end.

%%%===================================================================
%%% gen_event callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}, hibernate}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({monitor, Pid, Type, _Info},
State=#state{timer_ref=TimerRef}) when Pid == self() ->
%% Reset the inactivity timeout
NewTimerRef = reset_timer(TimerRef),
maybe_collect_garbage(Type),
{ok, State#state{timer_ref=NewTimerRef}};
handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
%% Reset the inactivity timeout
NewTimerRef = reset_timer(TimerRef),
{Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
{ok, State#state{timer_ref=NewTimerRef}};
handle_event(Event, State=#state{timer_ref=TimerRef}) ->
NewTimerRef = reset_timer(TimerRef),
rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
{ok, State#state{timer_ref=NewTimerRef}}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(_Call, State) ->
Reply = not_supported,
{ok, Reply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(inactivity_timeout, State) ->
%% No events have arrived for the timeout period
%% so hibernate to free up resources.
{ok, State, hibernate};
handle_info(Info, State) ->
rabbit_log:info("handle_info got ~p", [Info]),
{ok, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

format_pretty_proc_or_port_info(PidOrPort) ->
try
case get_pretty_proc_or_port_info(PidOrPort) of
undefined ->
{"", []};
Res ->
Res
end
catch C:E:S ->
{"Pid ~w, ~W ~W at ~w\n",
[PidOrPort, C, 20, E, 20, S]}
end.

get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
Infos = [registered_name, initial_call, current_function, message_queue_len],
case process_info(Pid, Infos) of
undefined ->
undefined;
[] ->
undefined;
[{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
ICT = case proc_lib:translate_initial_call(Pid) of
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
ICT1;
ICT2 ->
{initial_call, ICT2}
end,
RNL = if RN0 == [] -> [];
true -> [{name, RN0}]
end,
{"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
end;
get_pretty_proc_or_port_info(Port) when is_port(Port) ->
PortInfo = erlang:port_info(Port),
{value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
QueueSize = [erlang:port_info(Port, queue_size)],
Connected = case proplists:get_value(connected, PortInfo2) of
undefined ->
[];
ConnectedPid ->
case proc_lib:translate_initial_call(ConnectedPid) of
{proc_lib, init_p, 5} -> % not by proc_lib, see docs
[];
ICT ->
[{initial_call, ICT}]
end
end,
{"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.


%% @doc If the message type is due to a large heap warning
%% and the source is ourself, go ahead and collect garbage
%% to avoid the death spiral.
-spec maybe_collect_garbage(atom()) -> ok.
maybe_collect_garbage(large_heap) ->
erlang:garbage_collect(),
ok;
maybe_collect_garbage(_) ->
ok.

-spec reset_timer(undefined | reference()) -> reference().
reset_timer(undefined) ->
erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
reset_timer(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
reset_timer(undefined).
Loading