Skip to content

Commit c2d2012

Browse files
Merge pull request #11892 from rabbitmq/SimonUnge-shovel_metric
Add dynamic and static promethues metric gauge
2 parents 93d1ac9 + 1f1d422 commit c2d2012

File tree

8 files changed

+326
-3
lines changed

8 files changed

+326
-3
lines changed

deps/rabbitmq_prometheus/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ rabbitmq_app(
5454
"//deps/rabbit:erlang_app",
5555
"//deps/rabbitmq_federation:erlang_app",
5656
"//deps/rabbitmq_management_agent:erlang_app",
57+
"//deps/rabbitmq_shovel:erlang_app",
5758
"//deps/rabbitmq_web_dispatch:erlang_app",
5859
"@accept//:erlang_app",
5960
"@cowboy//:erlang_app",
@@ -108,6 +109,14 @@ rabbitmq_integration_suite(
108109
],
109110
)
110111

112+
rabbitmq_integration_suite(
113+
name = "prometheus_rabbitmq_shovel_collector_SUITE",
114+
size = "small",
115+
additional_beam = [
116+
"test/rabbitmq_prometheus_collector_test_proxy.beam", #keep
117+
],
118+
)
119+
111120
assert_suites()
112121

113122
alias(

deps/rabbitmq_prometheus/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ endef
99
PROJECT := rabbitmq_prometheus
1010
PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
1111
PROJECT_MOD := rabbit_prometheus_app
12-
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch rabbitmq_federation
12+
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch rabbitmq_federation rabbitmq_shovel
1313
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
1414
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
1515

deps/rabbitmq_prometheus/app.bzl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def all_beam_files(name = "all_beam_files"):
1515
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
1616
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
1717
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
18+
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
1819
"src/rabbit_prometheus_app.erl",
1920
"src/rabbit_prometheus_dispatcher.erl",
2021
"src/rabbit_prometheus_handler.erl",
@@ -46,6 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
4647
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
4748
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
4849
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
50+
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
4951
"src/rabbit_prometheus_app.erl",
5052
"src/rabbit_prometheus_dispatcher.erl",
5153
"src/rabbit_prometheus_handler.erl",
@@ -88,6 +90,7 @@ def all_srcs(name = "all_srcs"):
8890
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
8991
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
9092
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
93+
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
9194
"src/rabbit_prometheus_app.erl",
9295
"src/rabbit_prometheus_dispatcher.erl",
9396
"src/rabbit_prometheus_handler.erl",
@@ -142,3 +145,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
142145
app_name = "rabbitmq_prometheus",
143146
erlc_opts = "//:test_erlc_opts",
144147
)
148+
erlang_bytecode(
149+
name = "prometheus_rabbitmq_shovel_collector_SUITE_beam_files",
150+
testonly = True,
151+
srcs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.erl"],
152+
outs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.beam"],
153+
app_name = "rabbitmq_prometheus",
154+
erlc_opts = "//:test_erlc_opts",
155+
deps = ["//deps/amqp_client:erlang_app", "@prometheus//:erlang_app"],
156+
)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
-module(prometheus_rabbitmq_shovel_collector).
8+
-export([deregister_cleanup/1,
9+
collect_mf/2]).
10+
11+
-import(prometheus_model_helpers, [create_mf/4]).
12+
13+
-behaviour(prometheus_collector).
14+
15+
%% API exports
16+
-export([]).
17+
18+
%%====================================================================
19+
%% Collector API
20+
%%====================================================================
21+
22+
deregister_cleanup(_) -> ok.
23+
24+
collect_mf(_Registry, Callback) ->
25+
Status = rabbit_shovel_status:status(500),
26+
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) ->
27+
{maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap};
28+
({_,dynamic,{S, _}, _}, {SMap, DMap}) ->
29+
{SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)}
30+
end, {#{}, #{}}, Status),
31+
32+
Metrics = [{rabbitmq_shovel_dynamic, gauge, "Current number of dynamic shovels.",
33+
[{[{status, S}], C} || {S, C} <- maps:to_list(DynamicStatusGroups)]},
34+
{rabbitmq_shovel_static, gauge, "Current number of static shovels.",
35+
[{[{status, S}], C} || {S, C} <- maps:to_list(StaticStatusGroups)]}
36+
],
37+
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
38+
ok.
39+
40+
add_metric_family({Name, Type, Help, Metrics}, Callback) ->
41+
Callback(create_mf(Name, Help, Type, Metrics)).
42+
43+
%%====================================================================
44+
%% Private Parts
45+
%%====================================================================

deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ build_dispatcher() ->
1919
prometheus_rabbitmq_alarm_metrics_collector,
2020
prometheus_rabbitmq_dynamic_collector,
2121
prometheus_rabbitmq_federation_collector,
22+
prometheus_rabbitmq_shovel_collector,
2223
prometheus_process_collector]),
2324
prometheus_registry:register_collectors('per-object', [
2425
prometheus_vm_system_info_collector,
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(prometheus_rabbitmq_shovel_collector_SUITE).
9+
10+
-include_lib("eunit/include/eunit.hrl").
11+
-include_lib("common_test/include/ct.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("prometheus/include/prometheus_model.hrl").
14+
15+
-compile(export_all).
16+
17+
-define(DYN_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
18+
help = "Current number of dynamic shovels.",type = 'GAUGE',
19+
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
20+
value = <<"running">>}],
21+
gauge = #'Gauge'{value = Gauge},
22+
counter = undefined,summary = undefined,untyped = undefined,
23+
histogram = undefined,timestamp_ms = undefined}]}).
24+
25+
-define(STAT_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
26+
help = "Current number of static shovels.",type = 'GAUGE',
27+
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
28+
value = <<"running">>}],
29+
gauge = #'Gauge'{value = Gauge},
30+
counter = undefined,summary = undefined,untyped = undefined,
31+
histogram = undefined,timestamp_ms = undefined}]}).
32+
33+
-define(EMPTY_DYN_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
34+
help = "Current number of dynamic shovels.",type = 'GAUGE',
35+
metric = []}).
36+
37+
-define(EMPTY_STAT_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
38+
help = "Current number of static shovels.",type = 'GAUGE',
39+
metric = []}).
40+
41+
42+
all() ->
43+
[
44+
{group, non_parallel_tests}
45+
].
46+
47+
groups() ->
48+
[
49+
{non_parallel_tests, [], [
50+
dynamic,
51+
static,
52+
mix
53+
]}
54+
].
55+
56+
suite() ->
57+
[{timetrap, {minutes, 5}}].
58+
59+
%% -------------------------------------------------------------------
60+
%% Testsuite setup/teardown.
61+
%% -------------------------------------------------------------------
62+
init_per_suite(Config) ->
63+
rabbit_ct_helpers:log_environment(),
64+
Config1 = rabbit_ct_helpers:set_config(Config, [
65+
{rmq_nodename_suffix, ?MODULE},
66+
{ignored_crashes, [
67+
"server_initiated_close,404",
68+
"writer,send_failed,closed"
69+
]}
70+
]),
71+
rabbit_ct_helpers:run_setup_steps(Config1,
72+
rabbit_ct_broker_helpers:setup_steps() ++
73+
rabbit_ct_client_helpers:setup_steps()).
74+
75+
end_per_suite(Config) ->
76+
rabbit_ct_helpers:run_teardown_steps(Config,
77+
rabbit_ct_client_helpers:teardown_steps() ++
78+
rabbit_ct_broker_helpers:teardown_steps()).
79+
80+
init_per_group(_, Config) ->
81+
Config.
82+
83+
end_per_group(_, Config) ->
84+
Config.
85+
86+
init_per_testcase(Testcase, Config) ->
87+
rabbit_ct_helpers:testcase_started(Config, Testcase).
88+
89+
end_per_testcase(Testcase, Config) ->
90+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
91+
92+
%% -------------------------------------------------------------------
93+
%% Test cases
94+
%% -------------------------------------------------------------------
95+
96+
dynamic(Config) ->
97+
create_dynamic_shovel(Config, <<"test">>),
98+
running = get_shovel_status(Config, <<"test">>),
99+
[?DYN_RUNNING_METRIC(1), ?EMPTY_STAT_METRIC] = get_metrics(Config),
100+
create_dynamic_shovel(Config, <<"test2">>),
101+
running = get_shovel_status(Config, <<"test2">>),
102+
[?DYN_RUNNING_METRIC(2), ?EMPTY_STAT_METRIC] = get_metrics(Config),
103+
clear_param(Config, <<"test">>),
104+
clear_param(Config, <<"test2">>),
105+
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
106+
ok.
107+
108+
static(Config) ->
109+
create_static_shovel(Config, static_shovel),
110+
[?EMPTY_DYN_METRIC, ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),
111+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
112+
[]),
113+
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
114+
ok.
115+
116+
117+
mix(Config) ->
118+
create_dynamic_shovel(Config, <<"test">>),
119+
running = get_shovel_status(Config, <<"test">>),
120+
create_static_shovel(Config, static_shovel),
121+
122+
[?DYN_RUNNING_METRIC(1), ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),
123+
124+
clear_param(Config, <<"test">>),
125+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
126+
[]),
127+
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
128+
ok.
129+
130+
%% -------------------------------------------------------------------
131+
%% Internal
132+
%% -------------------------------------------------------------------
133+
134+
get_metrics(Config) ->
135+
rabbit_ct_broker_helpers:rpc(Config, 0,
136+
rabbitmq_prometheus_collector_test_proxy, collect_mf,
137+
[default, prometheus_rabbitmq_shovel_collector]).
138+
139+
create_static_shovel(Config, Name) ->
140+
SourceQueue = <<"source-queue">>,
141+
DestQueue = <<"dest-queue">>,
142+
Hostname = ?config(rmq_hostname, Config),
143+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
144+
Shovel = [{Name,
145+
[{source,
146+
[{protocol, amqp10},
147+
{uris, [rabbit_misc:format("amqp://~ts:~b",
148+
[Hostname, Port])]},
149+
{source_address, SourceQueue}]
150+
},
151+
{destination,
152+
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
153+
[Hostname, Port])]},
154+
{declarations,
155+
[{'queue.declare', [{queue, DestQueue}, auto_delete]}]},
156+
{publish_fields, [{exchange, <<>>},
157+
{routing_key, DestQueue}]},
158+
{publish_properties, [{delivery_mode, 2},
159+
{content_type, <<"shovelled">>}]},
160+
{add_forward_headers, true},
161+
{add_timestamp_header, true}]},
162+
{queue, <<>>},
163+
{ack_mode, no_ack}
164+
]}],
165+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
166+
[Shovel, Name]).
167+
168+
setup_shovel(ShovelConfig, Name) ->
169+
_ = application:stop(rabbitmq_shovel),
170+
application:set_env(rabbitmq_shovel, shovels, ShovelConfig, infinity),
171+
ok = application:start(rabbitmq_shovel),
172+
await_shovel(Name, static).
173+
174+
clear_shovel() ->
175+
_ = application:stop(rabbitmq_shovel),
176+
application:unset_env(rabbitmq_shovel, shovels, infinity),
177+
ok = application:start(rabbitmq_shovel).
178+
179+
make_uri(Config, Node) ->
180+
Hostname = ?config(rmq_hostname, Config),
181+
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
182+
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b",
183+
[Hostname, Port]))).
184+
185+
create_dynamic_shovel(Config, Name) ->
186+
Node = 0,
187+
QueueNode = 0,
188+
Uri = make_uri(Config, QueueNode),
189+
Value = [{<<"src-queue">>, <<"src">>},
190+
{<<"dest-queue">>, <<"dest">>}],
191+
ok = rabbit_ct_broker_helpers:rpc(
192+
Config,
193+
Node,
194+
rabbit_runtime_parameters,
195+
set, [
196+
<<"/">>, <<"shovel">>, Name, [{<<"src-uri">>, Uri},
197+
{<<"dest-uri">>, [Uri]} |
198+
Value], none]),
199+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_shovel,
200+
[Name, dynamic]).
201+
202+
await_shovel(Name, Type) ->
203+
Ret = await(fun() ->
204+
Status = shovels_from_status(running, Type),
205+
lists:member(Name, Status)
206+
end, 30_000),
207+
Ret.
208+
209+
shovels_from_status(ExpectedState, dynamic) ->
210+
S = rabbit_shovel_status:status(),
211+
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState];
212+
shovels_from_status(ExpectedState, static) ->
213+
S = rabbit_shovel_status:status(),
214+
[N || {N, static, {State, _}, _} <- S, State == ExpectedState].
215+
216+
get_shovel_status(Config, Name) ->
217+
get_shovel_status(Config, 0, Name).
218+
219+
get_shovel_status(Config, Node, Name) ->
220+
S = rabbit_ct_broker_helpers:rpc(
221+
Config, Node, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
222+
case S of
223+
not_found ->
224+
not_found;
225+
_ ->
226+
{Status, Info} = proplists:get_value(info, S),
227+
proplists:get_value(blocked_status, Info, Status)
228+
end.
229+
230+
await(Pred) ->
231+
case Pred() of
232+
true -> ok;
233+
false -> timer:sleep(100),
234+
await(Pred)
235+
end.
236+
237+
await(_Pred, Timeout) when Timeout =< 0 ->
238+
error(await_timeout);
239+
await(Pred, Timeout) ->
240+
case Pred() of
241+
true -> ok;
242+
Other when Timeout =< 100 ->
243+
error({await_timeout, Other});
244+
_ -> timer:sleep(100),
245+
await(Pred, Timeout - 100)
246+
end.
247+
248+
clear_param(Config, Name) ->
249+
clear_param(Config, 0, Name).
250+
251+
clear_param(Config, Node, Name) ->
252+
rabbit_ct_broker_helpers:rpc(Config, Node,
253+
rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]).

0 commit comments

Comments
 (0)