Skip to content

Commit f3dfb50

Browse files
committed
Add consumer count to stream queue metrics
This commit adds the "consumers" metrics to stream queues (consumer count). It is computed by counting the element of the consumer_created ETS table for the given stream queue and for each member of the Osiris cluster. Fixes #4622
1 parent c920908 commit f3dfb50

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

deps/rabbit/src/rabbit_osiris_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
online,
2727
members,
2828
memory,
29-
readers
29+
readers,
30+
consumers
3031
]).
3132

3233
-record(state, {timeout :: non_neg_integer()}).
@@ -79,7 +80,6 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
7980
[]
8081
end,
8182

82-
8383
rabbit_core_metrics:queue_stats(QName, Infos),
8484
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
8585
{messages, COffs},

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949

5050
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
5151
messages, messages_ready, messages_unacknowledged, committed_offset,
52-
policy, operator_policy, effective_policy_definition, type, memory]).
52+
policy, operator_policy, effective_policy_definition, type, memory,
53+
consumers]).
5354

5455
-type appender_seq() :: non_neg_integer().
5556

@@ -486,6 +487,18 @@ i(leader, Q) when ?is_amqqueue(Q) ->
486487
i(members, Q) when ?is_amqqueue(Q) ->
487488
#{nodes := Nodes} = amqqueue:get_type_state(Q),
488489
Nodes;
490+
i(consumers, Q) when ?is_amqqueue(Q) ->
491+
QName = amqqueue:get_name(Q),
492+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
493+
Spec = [{{{'$1', '_', '_'}, '_', '_', '_', '_', '_', '_'}, [{'==', {QName}, '$1'}], [true]}],
494+
lists:foldl(fun(N, Acc) ->
495+
case rpc:call(N, ets, select_count,[consumer_created, Spec], 10000) of
496+
Count when is_integer(Count) ->
497+
Acc + Count;
498+
_ ->
499+
Acc
500+
end
501+
end, 0, Nodes);
489502
i(memory, Q) when ?is_amqqueue(Q) ->
490503
%% Return writer memory. It's not the full memory usage (we also have replica readers on
491504
%% the writer node), but might be good enough

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ all_tests() -> [
8787
exchanges_test,
8888
queues_test,
8989
quorum_queues_test,
90+
stream_queues_have_consumers_field,
9091
queues_well_formed_json_test,
9192
bindings_test,
9293
bindings_post_test,
@@ -1099,6 +1100,26 @@ quorum_queues_test_loop(Config, N) ->
10991100
close_connection(Conn),
11001101
quorum_queues_test_loop(Config, N-1).
11011102

1103+
stream_queues_have_consumers_field(Config) ->
1104+
Good = [{durable, true}, {arguments, [{'x-queue-type', 'stream'}]}],
1105+
http_get(Config, "/queues/%2f/sq", ?NOT_FOUND),
1106+
http_put(Config, "/queues/%2f/sq", Good, {group, '2xx'}),
1107+
1108+
wait_until(fun() ->
1109+
Qs = http_get(Config, "/queues/%2F"),
1110+
length(Qs) == 1 andalso maps:is_key(consumers, lists:nth(1, Qs))
1111+
end, 50),
1112+
1113+
Queues = http_get(Config, "/queues/%2F"),
1114+
assert_list([#{name => <<"sq">>,
1115+
arguments => #{'x-queue-type' => <<"stream">>},
1116+
consumers => 0}],
1117+
Queues),
1118+
1119+
1120+
http_delete(Config, "/queues/%2f/sq", {group, '2xx'}),
1121+
ok.
1122+
11021123
queues_well_formed_json_test(Config) ->
11031124
%% TODO This test should be extended to the whole API
11041125
Good = [{durable, true}],

0 commit comments

Comments
 (0)