Skip to content

Revert #11559 #11689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions deps/rabbit/src/rabbit_core_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,14 @@ gc_leader_data(Id, Table, GbSet) ->
gc_global_queues() ->
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
gc_process_and_entity(channel_queue_metrics, GbSet),
gc_entity(queue_counter_metrics, GbSet),
gc_process_and_entity(consumer_created, GbSet),
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_metrics, GbSet, ExchangeGbSet).
gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet).

gc_exchanges() ->
Exchanges = rabbit_exchange:list_names(),
GbSet = gb_sets:from_list(Exchanges),
gc_process_and_entity(channel_exchange_metrics, GbSet),
gc_entity(exchange_metrics, GbSet).
gc_process_and_entity(channel_exchange_metrics, GbSet).

gc_nodes() ->
Nodes = rabbit_nodes:list_members(),
Expand Down Expand Up @@ -156,12 +153,6 @@ gc_entity(Table, GbSet) ->
({Id = Key, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _}, none)
when Table == exchange_metrics ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _, _, _, _}, none)
when Table == queue_counter_metrics ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).

Expand Down Expand Up @@ -197,13 +188,6 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
none
end.

gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
ets:foldl(fun({{QueueId, ExchangeId} = Key, _, _}, none)
when Table == queue_exchange_metrics ->
gc_entity(QueueId, Table, Key, QueueGbSet),
gc_entity(ExchangeId, Table, Key, ExchangeGbSet)
end, none, Table).

gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
gc_process(Pid, Table, Key),
Expand Down
8 changes: 0 additions & 8 deletions deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@
{auth_attempt_metrics, set},
{auth_attempt_detailed_metrics, set}]).

% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
% channel_queue_exchange_metrics but without including the channel ID in the
% key.
-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set},
{exchange_metrics, set},
{queue_exchange_metrics, set}]).

-define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}).

%% connection_created :: {connection_id, proplist}
Expand Down
44 changes: 15 additions & 29 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,13 @@ create_table({Table, Type}) ->
{read_concurrency, true}]).

init() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
_ = [create_table({Table, Type})
|| {Table, Type} <- Tables],
_ = [create_table({Table, Type})
|| {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
ok.

terminate() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
[ets:delete(Table)
|| {Table, _Type} <- Tables],
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
ok.

connection_created(Pid, Infos) ->
Expand Down Expand Up @@ -168,65 +166,53 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.

channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
channel_stats(exchange_stats, publish, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
channel_stats(exchange_stats, confirm, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
channel_stats(queue_exchange_stats, publish, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
ok;
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, get, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, get_no_ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, deliver, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, redeliver, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
channel_stats(queue_stats, get_empty, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok.

delete(Table, Key) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
{2, undefined, stream_segments, counter, "Total number of stream segment files", segments}
]},

%%% Metrics that contain reference to a channel. Some of them also have
%%% a queue name, but in this case filtering on it doesn't make any
%%% sense, as the queue is not an object of interest here.
Expand Down Expand Up @@ -208,7 +209,7 @@
]},

{channel_queue_exchange_metrics, [
{2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through a exchange on a channel"}
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
]}
]).

Expand All @@ -222,25 +223,8 @@
]},
{exchange_names, [
{2, undefined, exchange_name, gauge, "Enumerates exchanges without any additional info. This value is cluster-wide. A cheaper alternative to `exchange_bindings`"}
]},
{queue_exchange_metrics, [
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"}
]},
{exchange_metrics, [
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange"},
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed"},
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
]},
{queue_counter_metrics, [
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched from a queue with basic.get in manual acknowledgement mode"},
{3, undefined, queue_get_total, counter, "Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode"},
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered from a queue to consumers in manual acknowledgement mode"},
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered from a queue to consumers in automatic acknowledgement mode"},
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered from a queue to consumers"},
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers on a queue"},
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"}
]}]).
]}
]).

-define(TOTALS, [
%% ordering differs from metrics above, refer to list comprehension
Expand Down Expand Up @@ -558,22 +542,15 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == queue_counter_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == exchange_metrics;
Table == queue_exchange_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
Result = ets:foldl(fun
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({{#resource{kind = queue, virtual_host = VHost}, #resource{kind = exchange}}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
Expand All @@ -600,36 +577,6 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
ets:foldl(fun
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({{
#resource{kind = queue, virtual_host = VHost},
#resource{kind = exchange, virtual_host = VHost}
}, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
Expand Down Expand Up @@ -722,15 +669,15 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.

empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
empty(T) when T == ra_metrics ->
{T, 0, 0, 0, 0, 0, {0, 0}};
empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics ->
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
Expand Down
94 changes: 1 addition & 93 deletions deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ groups() ->
{config_path, [], generic_tests()},
{global_labels, [], generic_tests()},
{aggregated_metrics, [], [
aggregated_metrics_test,
aggregated_metrics_test,
specific_erlang_metrics_present_test,
global_metrics_present_test,
global_metrics_single_metric_family_test
Expand All @@ -57,8 +57,6 @@ groups() ->
queue_consumer_count_single_vhost_per_object_test,
queue_consumer_count_all_vhosts_per_object_test,
queue_coarse_metrics_per_object_test,
queue_counter_metrics_per_object_test,
queue_exchange_metrics_per_object_test,
queue_metrics_per_object_test,
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
Expand Down Expand Up @@ -525,96 +523,6 @@ queue_coarse_metrics_per_object_test(Config) ->
map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))),
ok.

queue_counter_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},

{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_counter_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body1))),

{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_counter_metrics",
[], 200),
Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]},

?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body2))),

%% Maybe missing, tests for the queue_exchange_metrics
ok.


queue_exchange_metrics_per_object_test(Config) ->
Expected1 = #{
#{
queue => "vhost-1-queue-with-messages",
vhost => "vhost-1",
exchange => ""
} => [7],
#{
exchange => "",
queue => "vhost-1-queue-with-consumer",
vhost => "vhost-1"
} => [7]
},

{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body1))),


{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics",
[], 200),


Expected2 = #{
#{
queue => "vhost-2-queue-with-messages",
vhost => "vhost-2",
exchange => ""
} => [11],
#{
exchange => "",
queue => "vhost-2-queue-with-consumer",
vhost => "vhost-2"
} => [11]
},

?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body2))),

ok.

exchange_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},

{_, Body} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body))),
ok.

queue_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7],
#{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]},
Expand Down
Loading