Skip to content

Commit 7b50e1e

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Add consumer count to stream queue metrics
This commit adds the "consumers" metrics to stream queues (consumer count). It is computed by counting the element of the consumer_created ETS table for the given stream queue and for each member of the Osiris cluster. Fixes #4622 (cherry picked from commit f3dfb50)
1 parent 9acff35 commit 7b50e1e

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

deps/rabbit/src/rabbit_osiris_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
online,
2727
members,
2828
memory,
29-
readers
29+
readers,
30+
consumers
3031
]).
3132

3233
-record(state, {timeout :: non_neg_integer()}).
@@ -79,7 +80,6 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
7980
[]
8081
end,
8182

82-
8383
rabbit_core_metrics:queue_stats(QName, Infos),
8484
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
8585
{messages, COffs},

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949

5050
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
5151
messages, messages_ready, messages_unacknowledged, committed_offset,
52-
policy, operator_policy, effective_policy_definition, type, memory]).
52+
policy, operator_policy, effective_policy_definition, type, memory,
53+
consumers]).
5354

5455
-type appender_seq() :: non_neg_integer().
5556

@@ -486,6 +487,18 @@ i(leader, Q) when ?is_amqqueue(Q) ->
486487
i(members, Q) when ?is_amqqueue(Q) ->
487488
#{nodes := Nodes} = amqqueue:get_type_state(Q),
488489
Nodes;
490+
i(consumers, Q) when ?is_amqqueue(Q) ->
491+
QName = amqqueue:get_name(Q),
492+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
493+
Spec = [{{{'$1', '_', '_'}, '_', '_', '_', '_', '_', '_'}, [{'==', {QName}, '$1'}], [true]}],
494+
lists:foldl(fun(N, Acc) ->
495+
case rpc:call(N, ets, select_count,[consumer_created, Spec], 10000) of
496+
Count when is_integer(Count) ->
497+
Acc + Count;
498+
_ ->
499+
Acc
500+
end
501+
end, 0, Nodes);
489502
i(memory, Q) when ?is_amqqueue(Q) ->
490503
%% Return writer memory. It's not the full memory usage (we also have replica readers on
491504
%% the writer node), but might be good enough

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ all_tests() -> [
8787
exchanges_test,
8888
queues_test,
8989
quorum_queues_test,
90+
stream_queues_have_consumers_field,
9091
queues_well_formed_json_test,
9192
bindings_test,
9293
bindings_post_test,
@@ -1099,6 +1100,26 @@ quorum_queues_test_loop(Config, N) ->
10991100
close_connection(Conn),
11001101
quorum_queues_test_loop(Config, N-1).
11011102

1103+
stream_queues_have_consumers_field(Config) ->
1104+
Good = [{durable, true}, {arguments, [{'x-queue-type', 'stream'}]}],
1105+
http_get(Config, "/queues/%2f/sq", ?NOT_FOUND),
1106+
http_put(Config, "/queues/%2f/sq", Good, {group, '2xx'}),
1107+
1108+
wait_until(fun() ->
1109+
Qs = http_get(Config, "/queues/%2F"),
1110+
length(Qs) == 1 andalso maps:is_key(consumers, lists:nth(1, Qs))
1111+
end, 50),
1112+
1113+
Queues = http_get(Config, "/queues/%2F"),
1114+
assert_list([#{name => <<"sq">>,
1115+
arguments => #{'x-queue-type' => <<"stream">>},
1116+
consumers => 0}],
1117+
Queues),
1118+
1119+
1120+
http_delete(Config, "/queues/%2f/sq", {group, '2xx'}),
1121+
ok.
1122+
11021123
queues_well_formed_json_test(Config) ->
11031124
%% TODO This test should be extended to the whole API
11041125
Good = [{durable, true}],

0 commit comments

Comments
 (0)