Skip to content

Commit c954d94

Browse files
Merge pull request #8374 from rabbitmq/mergify/bp/v3.12.x/pr-8358
Observer CLI plugins for QQs (backport #8358)
2 parents 7527e82 + 6749f0c commit c954d94

File tree

5 files changed

+179
-2
lines changed

5 files changed

+179
-2
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ def all_beam_files(name = "all_beam_files"):
162162
"src/rabbit_nodes.erl",
163163
"src/rabbit_observer_cli.erl",
164164
"src/rabbit_observer_cli_classic_queues.erl",
165+
"src/rabbit_observer_cli_quorum_queues.erl",
165166
"src/rabbit_osiris_metrics.erl",
166167
"src/rabbit_parameter_validation.erl",
167168
"src/rabbit_peer_discovery.erl",
@@ -403,6 +404,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
403404
"src/rabbit_nodes.erl",
404405
"src/rabbit_observer_cli.erl",
405406
"src/rabbit_observer_cli_classic_queues.erl",
407+
"src/rabbit_observer_cli_quorum_queues.erl",
406408
"src/rabbit_osiris_metrics.erl",
407409
"src/rabbit_parameter_validation.erl",
408410
"src/rabbit_peer_discovery.erl",
@@ -658,6 +660,7 @@ def all_srcs(name = "all_srcs"):
658660
"src/rabbit_nodes.erl",
659661
"src/rabbit_observer_cli.erl",
660662
"src/rabbit_observer_cli_classic_queues.erl",
663+
"src/rabbit_observer_cli_quorum_queues.erl",
661664
"src/rabbit_osiris_metrics.erl",
662665
"src/rabbit_parameter_validation.erl",
663666
"src/rabbit_peer_discovery.erl",

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1516,7 +1516,7 @@ stat(Q) ->
15161516
rabbit_queue_type:stat(Q).
15171517

15181518
-spec pid_of(amqqueue:amqqueue()) ->
1519-
pid() | 'none'.
1519+
pid() | amqqueue:ra_server_id() | 'none'.
15201520

15211521
pid_of(Q) -> amqqueue:get_pid(Q).
15221522

deps/rabbit/src/rabbit_observer_cli.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@
1111

1212
init() ->
1313
application:set_env(observer_cli, plugins, [
14-
rabbit_observer_cli_classic_queues:plugin_info()
14+
rabbit_observer_cli_classic_queues:plugin_info(),
15+
rabbit_observer_cli_quorum_queues:plugin_info()
1516
]).
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_observer_cli_quorum_queues).
9+
10+
-export([plugin_info/0]).
11+
-export([attributes/1, sheet_header/0, sheet_body/1]).
12+
13+
-include_lib("rabbit_common/include/rabbit.hrl").
14+
15+
plugin_info() ->
16+
#{
17+
module => rabbit_observer_cli_quorum_queues,
18+
title => "Quorum",
19+
shortcut => "QQ",
20+
sort_column => 4
21+
}.
22+
23+
attributes(State) ->
24+
Content1 = "S - leader/follower, MsgQ - Erlang mailbox queue, CMD - commands, SIW - snapshots installed/written",
25+
Content2 = "SS - snapshots sent, SW - snapshots written, MS - messages sent",
26+
Content3 = "E - elections, WOp - write operations, WRe - write resends, GC - Ra forced GC",
27+
Content4 = "CT - current term, SnapIdx - snapshot index, LA - last applied, CI - current index, LW - last written log entry index, CL - commit latency",
28+
RaCounters = ra_counters:overview(),
29+
{
30+
[ra_log_wal_header()] ++ [ra_log_wal_sheet(RaCounters)] ++
31+
[ra_log_segment_writer_header()] ++ [ra_log_segment_writer_sheet(RaCounters)] ++
32+
[
33+
[#{content => "", width => 136}],
34+
[#{content => Content1, width => 136}],
35+
[#{content => Content2, width => 136}],
36+
[#{content => Content3, width => 136}],
37+
[#{content => Content4, width => 136}],
38+
[#{content => "", width => 136}]
39+
]
40+
, State}.
41+
42+
ra_log_wal_header() ->
43+
[
44+
#{content => " ra_log_wal", width => 25, color => <<"\e[7m">>},
45+
#{content => " MsgQ ", width => 8, color => <<"\e[7m">>},
46+
#{content => " WAL files", width => 11, color => <<"\e[7m">>},
47+
#{content => " Bytes written", width => 17, color => <<"\e[7m">>},
48+
#{content => " Writes", width => 16, color => <<"\e[7m">>},
49+
#{content => " Batches", width => 15, color => <<"\e[7m">>},
50+
#{content => " Bytes/Batch", width => 15, color => <<"\e[7m">>},
51+
#{content => " Writes/Batch", width => 22, color => <<"\e[7m">>}
52+
].
53+
54+
ra_log_wal_sheet(RaCounters) ->
55+
RaLogWalCounters = maps:get(ra_log_wal, RaCounters),
56+
RaLogWalInfo = erlang:process_info(whereis(ra_log_wal), [message_queue_len]),
57+
[
58+
#{content => "", width => 24},
59+
#{content => proplists:get_value(message_queue_len, RaLogWalInfo), width => 6},
60+
#{content => maps:get(wal_files, RaLogWalCounters), width => 9},
61+
#{content => {byte, maps:get(bytes_written, RaLogWalCounters)}, width => 15},
62+
#{content => maps:get(writes, RaLogWalCounters), width => 14},
63+
#{content => maps:get(batches, RaLogWalCounters), width => 13},
64+
#{content => {byte, maps:get(bytes_written, RaLogWalCounters) / maps:get(batches, RaLogWalCounters)}, width => 13},
65+
#{content => maps:get(writes, RaLogWalCounters) / maps:get(batches, RaLogWalCounters), width => 21}
66+
].
67+
68+
ra_log_segment_writer_header() ->
69+
[
70+
#{content => " ra_log_segment_writer", width => 25, color => <<"\e[7m">>},
71+
#{content => " Bytes written", width => 17, color => <<"\e[7m">>},
72+
#{content => " Mem Tables", width => 16, color => <<"\e[7m">>},
73+
#{content => " Entries", width => 15, color => <<"\e[7m">>},
74+
#{content => " Segments", width => 59, color => <<"\e[7m">>}
75+
].
76+
77+
ra_log_segment_writer_sheet(RaCounters) ->
78+
RaLogWalInfo = maps:get(ra_log_segment_writer, RaCounters),
79+
[
80+
#{content => "", width => 24},
81+
#{content => {byte, maps:get(bytes_written, RaLogWalInfo)}, width => 15},
82+
#{content => maps:get(mem_tables, RaLogWalInfo), width => 14},
83+
#{content => maps:get(entries, RaLogWalInfo), width => 13},
84+
#{content => maps:get(segments, RaLogWalInfo), width => 58}
85+
].
86+
sheet_header() ->
87+
[
88+
#{title => "Pid", width => 10, shortcut => ""},
89+
#{title => "Name", width => 8, shortcut => ""},
90+
#{title => "S", width => 3, shortcut => ""},
91+
#{title => "Memory", width => 10, shortcut => "M"},
92+
#{title => "", width => 6, shortcut => "MsgQ"},
93+
#{title => "", width => 7, shortcut => "CMD"},
94+
#{title => "", width => 6, shortcut => "SI/W"},
95+
#{title => "", width => 5, shortcut => "SS"},
96+
#{title => "", width => 7, shortcut => "MS"},
97+
#{title => "", width => 5, shortcut => "E"},
98+
#{title => "", width => 7, shortcut => "WOps"},
99+
#{title => "", width => 5, shortcut => "WRe"},
100+
#{title => "", width => 4, shortcut => "CT"},
101+
#{title => "SnapIdx", width => 8, shortcut => ""},
102+
#{title => "", width => 7, shortcut => "LA"},
103+
#{title => "", width => 6, shortcut => "CI"},
104+
#{title => "", width => 6, shortcut => "LW"},
105+
#{title => "", width => 5, shortcut => "CL"}
106+
].
107+
108+
sheet_body(PrevState) ->
109+
RaStates = ets:tab2list(ra_state),
110+
Body = [begin
111+
#resource{name = Name, virtual_host = Vhost} = R = amqqueue:get_name(Q),
112+
case rabbit_amqqueue:pid_of(Q) of
113+
none ->
114+
empty_row(Name);
115+
{QName, _QNode} = _QQ ->
116+
case whereis(QName) of
117+
undefined ->
118+
empty_row(Name);
119+
Pid ->
120+
ProcInfo = erlang:process_info(Pid, [message_queue_len, memory]),
121+
case ProcInfo of
122+
undefined ->
123+
empty_row(Name);
124+
_ ->
125+
QQCounters = maps:get({QName, node()}, ra_counters:overview()),
126+
{ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}),
127+
[{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R),
128+
[
129+
Pid,
130+
QName,
131+
case proplists:get_value(InternalName, RaStates) of
132+
leader -> "L";
133+
follower -> "F";
134+
_ -> "?"
135+
end,
136+
format_int(proplists:get_value(memory, ProcInfo)),
137+
format_int(proplists:get_value(message_queue_len, ProcInfo)),
138+
format_int(maps:get(commands, QQCounters)),
139+
case proplists:get_value(InternalName, RaStates) of
140+
leader -> format_int(maps:get(snapshots_written, QQCounters));
141+
follower -> format_int(maps:get(snapshot_installed, QQCounters));
142+
_ -> "?"
143+
end,
144+
format_int(maps:get(snapshots_sent, QQCounters)),
145+
format_int(maps:get(msgs_sent, QQCounters)),
146+
format_int(maps:get(elections, QQCounters)),
147+
format_int(maps:get(write_ops, QQCounters)),
148+
format_int(maps:get(write_resends, QQCounters)),
149+
CT, SnapIdx, LA, CI, LW, CL
150+
]
151+
end
152+
end
153+
end
154+
end || Q <- list_quorum_queues()],
155+
{Body, PrevState}.
156+
157+
list_quorum_queues() ->
158+
rabbit_db_queue:get_all_by_type(rabbit_quorum_queue).
159+
160+
format_int(N) when N >= 1_000_000_000 ->
161+
integer_to_list(N div 1_000_000_000) ++ "B";
162+
format_int(N) when N >= 1_000_000 ->
163+
integer_to_list(N div 1_000_000) ++ "M";
164+
%% We print up to 9999 messages and shorten 10K+.
165+
format_int(N) when N >= 10_000 ->
166+
integer_to_list(N div 1_000) ++ "K";
167+
format_int(N) ->
168+
N.
169+
170+
empty_row(Name) ->
171+
["-", unicode:characters_to_binary([Name, " (dead)"])
172+
| [0 || _ <- lists:seq(1, length(sheet_header()) - 2)] ].

moduleindex.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ rabbit:
599599
- rabbit_nodes
600600
- rabbit_observer_cli
601601
- rabbit_observer_cli_classic_queues
602+
- rabbit_observer_cli_quorum_queues
602603
- rabbit_osiris_metrics
603604
- rabbit_parameter_validation
604605
- rabbit_peer_discovery

0 commit comments

Comments
 (0)