Skip to content

Add dynamic and static promethues metric gauge #11892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions deps/rabbitmq_prometheus/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ rabbitmq_app(
"//deps/rabbit:erlang_app",
"//deps/rabbitmq_federation:erlang_app",
"//deps/rabbitmq_management_agent:erlang_app",
"//deps/rabbitmq_shovel:erlang_app",
"//deps/rabbitmq_web_dispatch:erlang_app",
"@accept//:erlang_app",
"@cowboy//:erlang_app",
Expand Down Expand Up @@ -108,6 +109,14 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "prometheus_rabbitmq_shovel_collector_SUITE",
size = "small",
additional_beam = [
"test/rabbitmq_prometheus_collector_test_proxy.beam", #keep
],
)

assert_suites()

alias(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ endef
PROJECT := rabbitmq_prometheus
PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch rabbitmq_federation
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch rabbitmq_federation rabbitmq_shovel
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters

Expand Down
12 changes: 12 additions & 0 deletions deps/rabbitmq_prometheus/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def all_beam_files(name = "all_beam_files"):
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
"src/rabbit_prometheus_handler.erl",
Expand Down Expand Up @@ -46,6 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
"src/rabbit_prometheus_handler.erl",
Expand Down Expand Up @@ -88,6 +90,7 @@ def all_srcs(name = "all_srcs"):
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
"src/rabbit_prometheus_app.erl",
"src/rabbit_prometheus_dispatcher.erl",
"src/rabbit_prometheus_handler.erl",
Expand Down Expand Up @@ -142,3 +145,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_prometheus",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "prometheus_rabbitmq_shovel_collector_SUITE_beam_files",
testonly = True,
srcs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.erl"],
outs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.beam"],
app_name = "rabbitmq_prometheus",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "@prometheus//:erlang_app"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(prometheus_rabbitmq_shovel_collector).
-export([deregister_cleanup/1,
collect_mf/2]).

-import(prometheus_model_helpers, [create_mf/4]).

-behaviour(prometheus_collector).

%% API exports
-export([]).

%%====================================================================
%% Collector API
%%====================================================================

deregister_cleanup(_) -> ok.

collect_mf(_Registry, Callback) ->
Status = rabbit_shovel_status:status(500),
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) ->
{maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap};
({_,dynamic,{S, _}, _}, {SMap, DMap}) ->
{SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)}
end, {#{}, #{}}, Status),

Metrics = [{rabbitmq_shovel_dynamic, gauge, "Current number of dynamic shovels.",
[{[{status, S}], C} || {S, C} <- maps:to_list(DynamicStatusGroups)]},
{rabbitmq_shovel_static, gauge, "Current number of static shovels.",
[{[{status, S}], C} || {S, C} <- maps:to_list(StaticStatusGroups)]}
],
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
ok.

add_metric_family({Name, Type, Help, Metrics}, Callback) ->
Callback(create_mf(Name, Help, Type, Metrics)).

%%====================================================================
%% Private Parts
%%====================================================================
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ build_dispatcher() ->
prometheus_rabbitmq_alarm_metrics_collector,
prometheus_rabbitmq_dynamic_collector,
prometheus_rabbitmq_federation_collector,
prometheus_rabbitmq_shovel_collector,
prometheus_process_collector]),
prometheus_registry:register_collectors('per-object', [
prometheus_vm_system_info_collector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(prometheus_rabbitmq_shovel_collector_SUITE).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("prometheus/include/prometheus_model.hrl").

-compile(export_all).

-define(DYN_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
help = "Current number of dynamic shovels.",type = 'GAUGE',
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"running">>}],
gauge = #'Gauge'{value = Gauge},
counter = undefined,summary = undefined,untyped = undefined,
histogram = undefined,timestamp_ms = undefined}]}).

-define(STAT_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
help = "Current number of static shovels.",type = 'GAUGE',
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
value = <<"running">>}],
gauge = #'Gauge'{value = Gauge},
counter = undefined,summary = undefined,untyped = undefined,
histogram = undefined,timestamp_ms = undefined}]}).

-define(EMPTY_DYN_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
help = "Current number of dynamic shovels.",type = 'GAUGE',
metric = []}).

-define(EMPTY_STAT_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
help = "Current number of static shovels.",type = 'GAUGE',
metric = []}).


all() ->
[
{group, non_parallel_tests}
].

groups() ->
[
{non_parallel_tests, [], [
dynamic,
static,
mix
]}
].

suite() ->
[{timetrap, {minutes, 5}}].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{ignored_crashes, [
"server_initiated_close,404",
"writer,send_failed,closed"
]}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Test cases
%% -------------------------------------------------------------------

dynamic(Config) ->
create_dynamic_shovel(Config, <<"test">>),
running = get_shovel_status(Config, <<"test">>),
[?DYN_RUNNING_METRIC(1), ?EMPTY_STAT_METRIC] = get_metrics(Config),
create_dynamic_shovel(Config, <<"test2">>),
running = get_shovel_status(Config, <<"test2">>),
[?DYN_RUNNING_METRIC(2), ?EMPTY_STAT_METRIC] = get_metrics(Config),
clear_param(Config, <<"test">>),
clear_param(Config, <<"test2">>),
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
ok.

static(Config) ->
create_static_shovel(Config, static_shovel),
[?EMPTY_DYN_METRIC, ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
[]),
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
ok.


mix(Config) ->
create_dynamic_shovel(Config, <<"test">>),
running = get_shovel_status(Config, <<"test">>),
create_static_shovel(Config, static_shovel),

[?DYN_RUNNING_METRIC(1), ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),

clear_param(Config, <<"test">>),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
[]),
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
ok.

%% -------------------------------------------------------------------
%% Internal
%% -------------------------------------------------------------------

get_metrics(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbitmq_prometheus_collector_test_proxy, collect_mf,
[default, prometheus_rabbitmq_shovel_collector]).

create_static_shovel(Config, Name) ->
SourceQueue = <<"source-queue">>,
DestQueue = <<"dest-queue">>,
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Shovel = [{Name,
[{source,
[{protocol, amqp10},
{uris, [rabbit_misc:format("amqp://~ts:~b",
[Hostname, Port])]},
{source_address, SourceQueue}]
},
{destination,
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
[Hostname, Port])]},
{declarations,
[{'queue.declare', [{queue, DestQueue}, auto_delete]}]},
{publish_fields, [{exchange, <<>>},
{routing_key, DestQueue}]},
{publish_properties, [{delivery_mode, 2},
{content_type, <<"shovelled">>}]},
{add_forward_headers, true},
{add_timestamp_header, true}]},
{queue, <<>>},
{ack_mode, no_ack}
]}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
[Shovel, Name]).

setup_shovel(ShovelConfig, Name) ->
_ = application:stop(rabbitmq_shovel),
application:set_env(rabbitmq_shovel, shovels, ShovelConfig, infinity),
ok = application:start(rabbitmq_shovel),
await_shovel(Name, static).

clear_shovel() ->
_ = application:stop(rabbitmq_shovel),
application:unset_env(rabbitmq_shovel, shovels, infinity),
ok = application:start(rabbitmq_shovel).

make_uri(Config, Node) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b",
[Hostname, Port]))).

create_dynamic_shovel(Config, Name) ->
Node = 0,
QueueNode = 0,
Uri = make_uri(Config, QueueNode),
Value = [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}],
ok = rabbit_ct_broker_helpers:rpc(
Config,
Node,
rabbit_runtime_parameters,
set, [
<<"/">>, <<"shovel">>, Name, [{<<"src-uri">>, Uri},
{<<"dest-uri">>, [Uri]} |
Value], none]),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_shovel,
[Name, dynamic]).

await_shovel(Name, Type) ->
Ret = await(fun() ->
Status = shovels_from_status(running, Type),
lists:member(Name, Status)
end, 30_000),
Ret.

shovels_from_status(ExpectedState, dynamic) ->
S = rabbit_shovel_status:status(),
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState];
shovels_from_status(ExpectedState, static) ->
S = rabbit_shovel_status:status(),
[N || {N, static, {State, _}, _} <- S, State == ExpectedState].

get_shovel_status(Config, Name) ->
get_shovel_status(Config, 0, Name).

get_shovel_status(Config, Node, Name) ->
S = rabbit_ct_broker_helpers:rpc(
Config, Node, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
case S of
not_found ->
not_found;
_ ->
{Status, Info} = proplists:get_value(info, S),
proplists:get_value(blocked_status, Info, Status)
end.

await(Pred) ->
case Pred() of
true -> ok;
false -> timer:sleep(100),
await(Pred)
end.

await(_Pred, Timeout) when Timeout =< 0 ->
error(await_timeout);
await(Pred, Timeout) ->
case Pred() of
true -> ok;
Other when Timeout =< 100 ->
error({await_timeout, Other});
_ -> timer:sleep(100),
await(Pred, Timeout - 100)
end.

clear_param(Config, Name) ->
clear_param(Config, 0, Name).

clear_param(Config, Node, Name) ->
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]).
Loading
Loading