Skip to content

Commit e0e2397

Browse files
Merge pull request #4727 from rabbitmq/rabbitmq-server-4622-stream-queue-consumer-count
Add consumer count to stream queue metrics
2 parents f6531ff + febefbd commit e0e2397

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

deps/rabbit/src/rabbit_osiris_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
online,
2727
members,
2828
memory,
29-
readers
29+
readers,
30+
consumers
3031
]).
3132

3233
-record(state, {timeout :: non_neg_integer()}).
@@ -79,7 +80,6 @@ handle_info(tick, #state{timeout = Timeout} = State) ->
7980
[]
8081
end,
8182

82-
8383
rabbit_core_metrics:queue_stats(QName, Infos),
8484
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
8585
{messages, COffs},

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949

5050
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
5151
messages, messages_ready, messages_unacknowledged, committed_offset,
52-
policy, operator_policy, effective_policy_definition, type, memory]).
52+
policy, operator_policy, effective_policy_definition, type, memory,
53+
consumers]).
5354

5455
-type appender_seq() :: non_neg_integer().
5556

@@ -486,6 +487,22 @@ i(leader, Q) when ?is_amqqueue(Q) ->
486487
i(members, Q) when ?is_amqqueue(Q) ->
487488
#{nodes := Nodes} = amqqueue:get_type_state(Q),
488489
Nodes;
490+
i(consumers, Q) when ?is_amqqueue(Q) ->
491+
QName = amqqueue:get_name(Q),
492+
#{nodes := Nodes} = amqqueue:get_type_state(Q),
493+
Spec = [{{{'$1', '_', '_'}, '_', '_', '_', '_', '_', '_'}, [{'==', {QName}, '$1'}], [true]}],
494+
lists:foldl(fun(N, Acc) ->
495+
case rabbit_misc:rpc_call(N,
496+
ets,
497+
select_count,
498+
[consumer_created, Spec],
499+
10000) of
500+
Count when is_integer(Count) ->
501+
Acc + Count;
502+
_ ->
503+
Acc
504+
end
505+
end, 0, Nodes);
489506
i(memory, Q) when ?is_amqqueue(Q) ->
490507
%% Return writer memory. It's not the full memory usage (we also have replica readers on
491508
%% the writer node), but might be good enough

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ all_tests() -> [
8787
exchanges_test,
8888
queues_test,
8989
quorum_queues_test,
90+
stream_queues_have_consumers_field,
9091
queues_well_formed_json_test,
9192
bindings_test,
9293
bindings_post_test,
@@ -239,7 +240,13 @@ init_per_testcase(Testcase = permissions_vhost_test, Config) ->
239240
rabbit_ct_broker_helpers:delete_vhost(Config, <<"myvhost1">>),
240241
rabbit_ct_broker_helpers:delete_vhost(Config, <<"myvhost2">>),
241242
rabbit_ct_helpers:testcase_started(Config, Testcase);
242-
243+
init_per_testcase(Testcase = stream_queues_have_consumers_field, Config) ->
244+
case rabbit_ct_helpers:is_mixed_versions() of
245+
true ->
246+
{skip, "mixed version clusters are not supported"};
247+
_ ->
248+
rabbit_ct_helpers:testcase_started(Config, Testcase)
249+
end;
243250
init_per_testcase(Testcase, Config) ->
244251
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"rabbit_mgmt_SUITE:init_per_testcase">>),
245252
rabbit_ct_helpers:testcase_started(Config, Testcase).
@@ -1099,6 +1106,26 @@ quorum_queues_test_loop(Config, N) ->
10991106
close_connection(Conn),
11001107
quorum_queues_test_loop(Config, N-1).
11011108

1109+
stream_queues_have_consumers_field(Config) ->
1110+
Good = [{durable, true}, {arguments, [{'x-queue-type', 'stream'}]}],
1111+
http_get(Config, "/queues/%2f/sq", ?NOT_FOUND),
1112+
http_put(Config, "/queues/%2f/sq", Good, {group, '2xx'}),
1113+
1114+
wait_until(fun() ->
1115+
Qs = http_get(Config, "/queues/%2F"),
1116+
length(Qs) == 1 andalso maps:is_key(consumers, lists:nth(1, Qs))
1117+
end, 50),
1118+
1119+
Queues = http_get(Config, "/queues/%2F"),
1120+
assert_list([#{name => <<"sq">>,
1121+
arguments => #{'x-queue-type' => <<"stream">>},
1122+
consumers => 0}],
1123+
Queues),
1124+
1125+
1126+
http_delete(Config, "/queues/%2f/sq", {group, '2xx'}),
1127+
ok.
1128+
11021129
queues_well_formed_json_test(Config) ->
11031130
%% TODO This test should be extended to the whole API
11041131
Good = [{durable, true}],

0 commit comments

Comments
 (0)