Skip to content

Commit 59b5ccc

Browse files
authored
Merge pull request #4732 from rabbitmq/mergify/bp/v3.9.x/pr-4731
Add consumer count to stream queue metrics (backport #4727) (backport #4731)
2 parents e567fba + 49457ff commit 59b5ccc

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

deps/rabbit/src/rabbit_osiris_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
online,
2727
members,
2828
memory,
29-
readers
29+
readers,
30+
consumers
3031
]).
3132

3233
-record(state, {timeout :: non_neg_integer()}).
@@ -79,7 +80,6 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
7980
[]
8081
end,
8182

82-
8383
rabbit_core_metrics:queue_stats(QName, Infos),
8484
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
8585
{messages, COffs},

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949

5050
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
5151
messages, messages_ready, messages_unacknowledged, committed_offset,
52-
policy, operator_policy, effective_policy_definition, type, memory]).
52+
policy, operator_policy, effective_policy_definition, type, memory,
53+
consumers]).
5354

5455
-type appender_seq() :: non_neg_integer().
5556

@@ -481,6 +482,22 @@ i(leader, Q) when ?is_amqqueue(Q) ->
481482
i(members, Q) when ?is_amqqueue(Q) ->
482483
#{nodes := Nodes} = amqqueue:get_type_state(Q),
483484
Nodes;
485+
i(consumers, Q) when ?is_amqqueue(Q) ->
486+
QName = amqqueue:get_name(Q),
487+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
488+
Spec = [{{{'$1', '_', '_'}, '_', '_', '_', '_', '_', '_'}, [{'==', {QName}, '$1'}], [true]}],
489+
lists:foldl(fun(N, Acc) ->
490+
case rabbit_misc:rpc_call(N,
491+
ets,
492+
select_count,
493+
[consumer_created, Spec],
494+
10000) of
495+
Count when is_integer(Count) ->
496+
Acc + Count;
497+
_ ->
498+
Acc
499+
end
500+
end, 0, Nodes);
484501
i(memory, Q) when ?is_amqqueue(Q) ->
485502
%% Return writer memory. It's not the full memory usage (we also have replica readers on
486503
%% the writer node), but might be good enough

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ all_tests() -> [
8787
exchanges_test,
8888
queues_test,
8989
quorum_queues_test,
90+
stream_queues_have_consumers_field,
9091
queues_well_formed_json_test,
9192
bindings_test,
9293
bindings_post_test,
@@ -239,7 +240,13 @@ init_per_testcase(Testcase = permissions_vhost_test, Config) ->
239240
rabbit_ct_broker_helpers:delete_vhost(Config, <<"myvhost1">>),
240241
rabbit_ct_broker_helpers:delete_vhost(Config, <<"myvhost2">>),
241242
rabbit_ct_helpers:testcase_started(Config, Testcase);
242-
243+
init_per_testcase(Testcase = stream_queues_have_consumers_field, Config) ->
244+
case rabbit_ct_helpers:is_mixed_versions() of
245+
true ->
246+
{skip, "mixed version clusters are not supported"};
247+
_ ->
248+
rabbit_ct_helpers:testcase_started(Config, Testcase)
249+
end;
243250
init_per_testcase(Testcase, Config) ->
244251
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"rabbit_mgmt_SUITE:init_per_testcase">>),
245252
rabbit_ct_helpers:testcase_started(Config, Testcase).
@@ -1094,6 +1101,26 @@ quorum_queues_test_loop(Config, N) ->
10941101
close_connection(Conn),
10951102
quorum_queues_test_loop(Config, N-1).
10961103

1104+
stream_queues_have_consumers_field(Config) ->
1105+
Good = [{durable, true}, {arguments, [{'x-queue-type', 'stream'}]}],
1106+
http_get(Config, "/queues/%2f/sq", ?NOT_FOUND),
1107+
http_put(Config, "/queues/%2f/sq", Good, {group, '2xx'}),
1108+
1109+
wait_until(fun() ->
1110+
Qs = http_get(Config, "/queues/%2F"),
1111+
length(Qs) == 1 andalso maps:is_key(consumers, lists:nth(1, Qs))
1112+
end, 50),
1113+
1114+
Queues = http_get(Config, "/queues/%2F"),
1115+
assert_list([#{name => <<"sq">>,
1116+
arguments => #{'x-queue-type' => <<"stream">>},
1117+
consumers => 0}],
1118+
Queues),
1119+
1120+
1121+
http_delete(Config, "/queues/%2f/sq", {group, '2xx'}),
1122+
ok.
1123+
10971124
queues_well_formed_json_test(Config) ->
10981125
%% TODO This test should be extended to the whole API
10991126
Good = [{durable, true}],

0 commit comments

Comments
 (0)