Skip to content

Commit fa63e01

Browse files
Merge pull request #11755 from rabbitmq/cloudamqp-prometheus-no-channel-label
Prometheus: some per-exchange/per-queue metrics aggregated per-channel (rebased)
2 parents 4090358 + 2cb27b2 commit fa63e01

File tree

5 files changed

+218
-25
lines changed

5 files changed

+218
-25
lines changed

deps/rabbit/src/rabbit_core_metrics_gc.erl

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,17 @@ gc_leader_data(Id, Table, GbSet) ->
9292
gc_global_queues() ->
9393
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
9494
gc_process_and_entity(channel_queue_metrics, GbSet),
95+
gc_entity(queue_delivery_metrics, GbSet),
9596
gc_process_and_entity(consumer_created, GbSet),
9697
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
97-
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet).
98+
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet),
99+
gc_entities(queue_exchange_metrics, GbSet, ExchangeGbSet).
98100

99101
gc_exchanges() ->
100102
Exchanges = rabbit_exchange:list_names(),
101103
GbSet = gb_sets:from_list(Exchanges),
102-
gc_process_and_entity(channel_exchange_metrics, GbSet).
104+
gc_process_and_entity(channel_exchange_metrics, GbSet),
105+
gc_entity(exchange_metrics, GbSet).
103106

104107
gc_nodes() ->
105108
Nodes = rabbit_nodes:list_members(),
@@ -153,6 +156,12 @@ gc_entity(Table, GbSet) ->
153156
({Id = Key, _, _}, none) ->
154157
gc_entity(Id, Table, Key, GbSet);
155158
({Id = Key, _, _, _, _}, none) ->
159+
gc_entity(Id, Table, Key, GbSet);
160+
({Id = Key, _, _, _, _, _}, none)
161+
when Table == exchange_metrics ->
162+
gc_entity(Id, Table, Key, GbSet);
163+
({Id = Key, _, _, _, _, _, _, _, _}, none)
164+
when Table == queue_delivery_metrics ->
156165
gc_entity(Id, Table, Key, GbSet)
157166
end, none, Table).
158167

@@ -188,6 +197,13 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
188197
none
189198
end.
190199

200+
gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
201+
ets:foldl(fun({{QueueId, ExchangeId} = Key, _, _}, none)
202+
when Table == queue_exchange_metrics ->
203+
gc_entity(QueueId, Table, Key, QueueGbSet),
204+
gc_entity(ExchangeId, Table, Key, ExchangeGbSet)
205+
end, none, Table).
206+
191207
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
192208
ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
193209
gc_process(Pid, Table, Key),

deps/rabbit_common/include/rabbit_core_metrics.hrl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@
2828
{auth_attempt_metrics, set},
2929
{auth_attempt_detailed_metrics, set}]).
3030

31+
% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
32+
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
33+
% channel_queue_exchange_metrics but without including the channel ID in the
34+
% key.
35+
-define(CORE_NON_CHANNEL_TABLES, [{queue_delivery_metrics, set},
36+
{exchange_metrics, set},
37+
{queue_exchange_metrics, set}]).
38+
3139
-define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}).
3240

3341
%% connection_created :: {connection_id, proplist}

deps/rabbit_common/src/rabbit_core_metrics.erl

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,15 @@ create_table({Table, Type}) ->
111111
{read_concurrency, true}]).
112112

113113
init() ->
114-
_ = [create_table({Table, Type})
115-
|| {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
114+
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
115+
_ = [create_table({Table, Type})
116+
|| {Table, Type} <- Tables],
116117
ok.
117118

118119
terminate() ->
120+
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
119121
[ets:delete(Table)
120-
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
122+
|| {Table, _Type} <- Tables],
121123
ok.
122124

123125
connection_created(Pid, Infos) ->
@@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) ->
166168
ets:insert(channel_process_metrics, {Id, Value}),
167169
ok.
168170

169-
channel_stats(exchange_stats, publish, Id, Value) ->
171+
channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
170172
%% Includes delete marker
171173
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
174+
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
172175
ok;
173-
channel_stats(exchange_stats, confirm, Id, Value) ->
176+
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
174177
%% Includes delete marker
175178
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
179+
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
176180
ok;
177-
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
181+
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
178182
%% Includes delete marker
179183
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
184+
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
180185
ok;
181-
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
186+
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
182187
%% Includes delete marker
183188
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
189+
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
184190
ok;
185-
channel_stats(queue_exchange_stats, publish, Id, Value) ->
191+
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
186192
%% Includes delete marker
187193
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
194+
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
188195
ok;
189-
channel_stats(queue_stats, get, Id, Value) ->
196+
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
190197
%% Includes delete marker
191198
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
199+
_ = ets:update_counter(queue_delivery_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
192200
ok;
193-
channel_stats(queue_stats, get_no_ack, Id, Value) ->
201+
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
194202
%% Includes delete marker
195203
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
204+
_ = ets:update_counter(queue_delivery_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
196205
ok;
197-
channel_stats(queue_stats, deliver, Id, Value) ->
206+
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
198207
%% Includes delete marker
199208
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
209+
_ = ets:update_counter(queue_delivery_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
200210
ok;
201-
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
211+
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
202212
%% Includes delete marker
203213
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
214+
_ = ets:update_counter(queue_delivery_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
204215
ok;
205-
channel_stats(queue_stats, redeliver, Id, Value) ->
216+
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
206217
%% Includes delete marker
207218
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
219+
_ = ets:update_counter(queue_delivery_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
208220
ok;
209-
channel_stats(queue_stats, ack, Id, Value) ->
221+
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
210222
%% Includes delete marker
211223
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
224+
_ = ets:update_counter(queue_delivery_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
212225
ok;
213-
channel_stats(queue_stats, get_empty, Id, Value) ->
226+
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
214227
%% Includes delete marker
215228
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
229+
_ = ets:update_counter(queue_delivery_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
216230
ok.
217231

218232
delete(Table, Key) ->

deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,6 @@
189189
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
190190
{2, undefined, stream_segments, counter, "Total number of stream segment files", segments}
191191
]},
192-
193192
%%% Metrics that contain reference to a channel. Some of them also have
194193
%%% a queue name, but in this case filtering on it doesn't make any
195194
%%% sense, as the queue is not an object of interest here.
@@ -238,9 +237,32 @@
238237
]},
239238

240239
{channel_queue_exchange_metrics, [
241-
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
242-
]}
243-
]).
240+
{2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through an exchange on a channel"}
241+
]},
242+
243+
%%% Metrics in the following 3 groups reference a queue and/or exchange.
244+
%%% They each have a corresponding group in the above per-channel
245+
%%% section but here the channel is not an object of interest.
246+
{exchange_metrics, [
247+
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange"},
248+
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed"},
249+
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
250+
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
251+
]},
252+
253+
{queue_delivery_metrics, [
254+
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched from a queue with basic.get in manual acknowledgement mode"},
255+
{3, undefined, queue_get_total, counter, "Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode"},
256+
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered from a queue to consumers in manual acknowledgement mode"},
257+
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered from a queue to consumers in automatic acknowledgement mode"},
258+
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered from a queue to consumers"},
259+
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers on a queue"},
260+
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"}
261+
]},
262+
263+
{queue_exchange_metrics, [
264+
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"}
265+
]}]).
244266

245267
%% Metrics that can be only requested through `/metrics/detailed`
246268
-define(METRICS_CLUSTER,[
@@ -571,15 +593,22 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
571593
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
572594
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
573595
Table == queue_coarse_metrics;
596+
Table == queue_delivery_metrics;
574597
Table == channel_queue_metrics;
575598
Table == connection_coarse_metrics;
599+
Table == exchange_metrics;
600+
Table == queue_exchange_metrics;
576601
Table == channel_queue_exchange_metrics;
577602
Table == ra_metrics;
578603
Table == channel_process_metrics ->
579604
Result = ets:foldl(fun
580605
%% For queue_coarse_metrics
581606
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
582607
Acc;
608+
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
609+
Acc;
610+
({{#resource{kind = queue, virtual_host = VHost}, #resource{kind = exchange}}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
611+
Acc;
583612
({_, V1}, {T, A1}) ->
584613
{T, V1 + A1};
585614
({_, V1, _}, {T, A1}) ->
@@ -606,6 +635,36 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
606635
_ ->
607636
[Result]
608637
end;
638+
get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
639+
ets:foldl(fun
640+
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
641+
map_get(VHost, VHostsFilter)
642+
->
643+
[Row | Acc];
644+
(_Row, Acc) ->
645+
Acc
646+
end, [], Table);
647+
get_data(queue_delivery_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
648+
ets:foldl(fun
649+
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
650+
map_get(VHost, VHostsFilter)
651+
->
652+
[Row | Acc];
653+
(_Row, Acc) ->
654+
Acc
655+
end, [], Table);
656+
get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
657+
ets:foldl(fun
658+
({{
659+
#resource{kind = queue, virtual_host = VHost},
660+
#resource{kind = exchange, virtual_host = VHost}
661+
}, _, _} = Row, Acc) when
662+
map_get(VHost, VHostsFilter)
663+
->
664+
[Row | Acc];
665+
(_Row, Acc) ->
666+
Acc
667+
end, [], Table);
609668
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
610669
ets:foldl(fun
611670
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
@@ -730,15 +789,15 @@ division(A, B) ->
730789
accumulate_count_and_sum(Value, {Count, Sum}) ->
731790
{Count + 1, Sum + Value}.
732791

733-
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
792+
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
734793
{T, 0};
735794
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
736795
{T, 0, 0, 0};
737-
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
796+
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
738797
{T, 0, 0, 0, 0};
739798
empty(T) when T == ra_metrics ->
740799
{T, 0, 0, 0, 0, 0, {0, 0}};
741-
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
800+
empty(T) when T == channel_queue_metrics; T == queue_delivery_metrics; T == channel_metrics ->
742801
{T, 0, 0, 0, 0, 0, 0, 0};
743802
empty(queue_metrics = T) ->
744803
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.

0 commit comments

Comments
 (0)