Skip to content

Commit 3676541

Browse files
dcorbachokjnilsson
authored andcommitted
Report message bytes in quorum queue stats
[#161505138]
1 parent 685f8ea commit 3676541

File tree

3 files changed

+165
-30
lines changed

3 files changed

+165
-30
lines changed

src/rabbit_fifo.erl

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-compile(inline).
2323

2424
-include_lib("ra/include/ra.hrl").
25+
-include_lib("rabbit_common/include/rabbit.hrl").
2526

2627
-export([
2728
init/1,
@@ -42,6 +43,8 @@
4243
query_consumers/1,
4344
usage/1,
4445

46+
zero/1,
47+
4548
%% misc
4649
dehydrate_state/1,
4750

@@ -226,7 +229,9 @@
226229
%% This is done so that consumers are still served in a deterministic
227230
%% order on recovery.
228231
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
229-
PrefixMsgs :: non_neg_integer()}
232+
PrefixMsgs :: non_neg_integer()},
233+
msg_bytes_enqueue = 0 :: non_neg_integer(),
234+
msg_bytes_checkout = 0 :: non_neg_integer()
230235
}).
231236

232237
-opaque state() :: #state{}.
@@ -266,6 +271,9 @@ update_state(Conf, State) ->
266271
become_leader_handler = BLH,
267272
shadow_copy_interval = SHI}.
268273

274+
zero(_) ->
275+
0.
276+
269277
% msg_ids are scoped per consumer
270278
% ra_indexes holds all raft indexes for enqueues currently on queue
271279
-spec apply(ra_machine:command_meta_data(), command(),
@@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
275283
msg = RawMsg}, Effects0, State00) ->
276284
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
277285
{ok, State0, Effects1} ->
278-
{State, Effects, ok} = checkout(State0, Effects1),
286+
{State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
287+
Effects1),
279288
{append_to_master_index(RaftIdx, State), Effects, ok};
280289
{duplicate, State, Effects} ->
281290
{State, Effects, ok}
@@ -543,26 +552,35 @@ tick(_Ts, #state{name = Name,
543552
queue_resource = QName,
544553
messages = Messages,
545554
ra_indexes = Indexes,
546-
consumers = Cons} = State) ->
555+
consumers = Cons,
556+
msg_bytes_enqueue = EnqueueBytes,
557+
msg_bytes_checkout = CheckoutBytes} = State) ->
547558
Metrics = {Name,
548559
maps:size(Messages), % Ready
549560
num_checked_out(State), % checked out
550561
rabbit_fifo_index:size(Indexes), %% Total
551-
maps:size(Cons)}, % Consumers
562+
maps:size(Cons), % Consumers
563+
EnqueueBytes,
564+
CheckoutBytes},
552565
[{mod_call, rabbit_quorum_queue,
553566
update_metrics, [QName, Metrics]}, {aux, emit}].
554567

555568
-spec overview(state()) -> map().
556569
overview(#state{consumers = Cons,
557570
enqueuers = Enqs,
558571
messages = Messages,
559-
ra_indexes = Indexes} = State) ->
572+
ra_indexes = Indexes,
573+
msg_bytes_enqueue = EnqueueBytes,
574+
msg_bytes_checkout = CheckoutBytes
575+
} = State) ->
560576
#{type => ?MODULE,
561577
num_consumers => maps:size(Cons),
562578
num_checked_out => num_checked_out(State),
563579
num_enqueuers => maps:size(Enqs),
564580
num_ready_messages => maps:size(Messages),
565-
num_messages => rabbit_fifo_index:size(Indexes)}.
581+
num_messages => rabbit_fifo_index:size(Indexes),
582+
enqueue_message_bytes => EnqueueBytes,
583+
checkout_message_bytes => CheckoutBytes}.
566584

567585
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
568586
[delivery_msg()].
@@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
806824
Checked = maps:without(MsgIds, Checked0),
807825
Discarded = maps:with(MsgIds, Checked0),
808826
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
827+
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
828+
add_bytes_settle(RawMsg, Acc);
829+
(_, Acc) ->
830+
Acc
831+
end, State0, maps:values(Discarded)),
809832
%% need to pass the length of discarded as $prefix_msgs would be filtered
810833
%% by the above list comprehension
811-
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
834+
{State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
812835
maps:size(Discarded),
813-
Con0, Checked, Effects0, State0),
814-
{State, Effects, _} = checkout(State1, Effects1),
836+
Con0, Checked, Effects0, State1),
837+
{State, Effects, _} = checkout(State2, Effects1),
815838
% settle metrics are incremented separately
816839
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
817840

@@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
873896
1, Header0),
874897
Msg = {RaftId, {Header, RawMsg}},
875898
% this should not affect the release cursor in any way
876-
State0#state{messages = maps:put(MsgNum, Msg, Messages),
877-
returns = lqueue:in(MsgNum, Returns)}.
899+
add_bytes_return(RawMsg,
900+
State0#state{messages = maps:put(MsgNum, Msg, Messages),
901+
returns = lqueue:in(MsgNum, Returns)}).
878902

879903
return_all(State, Checked) ->
880904
maps:fold(fun (_, '$prefix_msg', S) ->
@@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0,
9931017
{Cons, SQ, []} = % we expect no effects
9941018
update_or_remove_sub(ConsumerId, Con,
9951019
Cons0, SQ1, []),
996-
State = State0#state{service_queue = SQ,
997-
messages = Messages,
998-
consumers = Cons},
999-
Msg = case ConsumerMsg of
1000-
'$prefix_msg' -> '$prefix_msg';
1001-
{_, {_, M}} -> M
1002-
end,
1020+
State1 = State0#state{service_queue = SQ,
1021+
messages = Messages,
1022+
consumers = Cons},
1023+
{State, Msg} =
1024+
case ConsumerMsg of
1025+
'$prefix_msg' ->
1026+
{State1, '$prefix_msg'};
1027+
{_, {_, {_, RawMsg} = M}} ->
1028+
{add_bytes_checkout(RawMsg, State1), M}
1029+
end,
10031030
{success, ConsumerId, Next, Msg, State};
10041031
error ->
10051032
%% consumer did not exist but was queued, recurse
@@ -1147,6 +1174,35 @@ make_purge() -> #purge{}.
11471174
make_update_state(Config) ->
11481175
#update_state{config = Config}.
11491176

1177+
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
1178+
Bytes = message_size(Msg),
1179+
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
1180+
1181+
add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
1182+
msg_bytes_enqueue = Enqueue } = State) ->
1183+
Bytes = message_size(Msg),
1184+
State#state{msg_bytes_checkout = Checkout + Bytes,
1185+
msg_bytes_enqueue = Enqueue - Bytes}.
1186+
1187+
add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
1188+
Bytes = message_size(Msg),
1189+
State#state{msg_bytes_checkout = Checkout - Bytes}.
1190+
1191+
add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
1192+
msg_bytes_enqueue = Enqueue} = State) ->
1193+
Bytes = message_size(Msg),
1194+
State#state{msg_bytes_checkout = Checkout - Bytes,
1195+
msg_bytes_enqueue = Enqueue + Bytes}.
1196+
1197+
message_size(#basic_message{content = Content}) ->
1198+
#content{payload_fragments_rev = PFR} = Content,
1199+
iolist_size(PFR);
1200+
message_size(B) when is_binary(B) ->
1201+
byte_size(B);
1202+
message_size(Msg) ->
1203+
%% probably only hit this for testing so ok to use erts_debug
1204+
erts_debug:size(Msg).
1205+
11501206
-ifdef(TEST).
11511207
-include_lib("eunit/include/eunit.hrl").
11521208

@@ -1170,7 +1226,8 @@ make_update_state(Config) ->
11701226

11711227
test_init(Name) ->
11721228
init(#{name => Name,
1173-
queue_resource => queue_resource,
1229+
queue_resource => rabbit_misc:r("/", queue,
1230+
atom_to_binary(Name, utf8)),
11741231
shadow_copy_interval => 0}).
11751232

11761233
enq_enq_checkout_test() ->
@@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
15291586
tick_test() ->
15301587
Cid = {<<"c">>, self()},
15311588
Cid2 = {<<"c2">>, self()},
1532-
{S0, _} = enq(1, 1, fst, test_init(test)),
1533-
{S1, _} = enq(2, 2, snd, S0),
1589+
{S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
1590+
{S1, _} = enq(2, 2, <<"snd">>, S0),
15341591
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
15351592
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
15361593
{S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
15371594

1538-
[{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
1595+
[{mod_call, _, _,
1596+
[#resource{},
1597+
{?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
15391598
ok.
15401599

15411600
enq_deq_snapshot_recover_test() ->

src/rabbit_quorum_queue.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ ra_machine(Q) ->
150150
ra_machine_config(Q = #amqqueue{name = QName}) ->
151151
#{dead_letter_handler => dlx_mfa(Q),
152152
queue_resource => QName,
153-
become_leader_handler => {?MODULE, become_leader, [QName]},
154-
metrics_handler => {?MODULE, update_metrics, [QName]}}.
153+
become_leader_handler => {?MODULE, become_leader, [QName]}}.
155154

156155
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
157156
Node = node(ChPid),
@@ -198,14 +197,17 @@ rpc_delete_metrics(QName) ->
198197
ets:delete(queue_metrics, QName),
199198
ok.
200199

201-
update_metrics(QName, {Name, MR, MU, M, C}) ->
200+
update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
202201
R = reductions(Name),
203202
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
204203
Util = case C of
205204
0 -> 0;
206205
_ -> rabbit_fifo:usage(Name)
207206
end,
208-
Infos = [{consumers, C}, {consumer_utilisation, Util} | infos(QName)],
207+
Infos = [{consumers, C}, {consumer_utilisation, Util},
208+
{message_bytes_ready, MsgBytesReady},
209+
{message_bytes_unacknowledged, MsgBytesUnack},
210+
{message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
209211
rabbit_core_metrics:queue_stats(QName, Infos),
210212
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
211213
{messages, M},

test/quorum_queue_SUITE.erl

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ all_tests() ->
120120
delete_immediately_by_resource,
121121
consume_redelivery_count,
122122
subscribe_redelivery_count,
123+
message_bytes_metrics,
123124
memory_alarm_rolls_wal
124125
].
125126

@@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) ->
20392040
requeue = true}),
20402041
%% wait for requeueing
20412042
timer:sleep(500),
2042-
2043+
20432044
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
20442045
redelivered = true},
20452046
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) ->
20582059
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
20592060
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
20602061
multiple = false,
2061-
requeue = true}).
2062+
requeue = true}),
2063+
ok.
20622064

2063-
memory_alarm_rolls_wal(Config) ->
2065+
message_bytes_metrics(Config) ->
20642066
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2067+
20652068
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
20662069
QQ = ?config(queue_name, Config),
20672070
?assertEqual({'queue.declare_ok', QQ, 0, 0},
20682071
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2072+
2073+
RaName = ra_name(QQ),
2074+
{ok, _, {_, Leader}} = ra:members({RaName, Server}),
2075+
QRes = rabbit_misc:r(<<"/">>, queue, QQ),
2076+
2077+
publish(Ch, QQ),
2078+
2079+
wait_for_messages_ready(Servers, RaName, 1),
2080+
wait_for_messages_pending_ack(Servers, RaName, 0),
2081+
wait_until(fun() ->
2082+
{3, 3, 0} == get_message_bytes(Leader, QRes)
2083+
end),
2084+
2085+
subscribe(Ch, QQ, false),
2086+
2087+
wait_for_messages_ready(Servers, RaName, 0),
2088+
wait_for_messages_pending_ack(Servers, RaName, 1),
2089+
wait_until(fun() ->
2090+
{3, 0, 3} == get_message_bytes(Leader, QRes)
2091+
end),
2092+
2093+
receive
2094+
{#'basic.deliver'{delivery_tag = DeliveryTag,
2095+
redelivered = false}, _} ->
2096+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2097+
multiple = false,
2098+
requeue = false}),
2099+
wait_for_messages_ready(Servers, RaName, 0),
2100+
wait_for_messages_pending_ack(Servers, RaName, 0),
2101+
wait_until(fun() ->
2102+
{0, 0, 0} == get_message_bytes(Leader, QRes)
2103+
end)
2104+
end,
2105+
2106+
%% Let's publish and then close the consumer channel. Messages must be
2107+
%% returned to the queue
2108+
publish(Ch, QQ),
2109+
2110+
wait_for_messages_ready(Servers, RaName, 0),
2111+
wait_for_messages_pending_ack(Servers, RaName, 1),
2112+
wait_until(fun() ->
2113+
{3, 0, 3} == get_message_bytes(Leader, QRes)
2114+
end),
2115+
2116+
rabbit_ct_client_helpers:close_channel(Ch),
2117+
2118+
wait_for_messages_ready(Servers, RaName, 1),
2119+
wait_for_messages_pending_ack(Servers, RaName, 0),
2120+
wait_until(fun() ->
2121+
{3, 3, 0} == get_message_bytes(Leader, QRes)
2122+
end),
2123+
ok.
2124+
2125+
memory_alarm_rolls_wal(Config) ->
2126+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
20692127
WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
20702128
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
20712129
ok = rpc:call(Server, rabbit_alarm, set_alarm,
@@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) ->
20792137
[{{resource_limit, memory, Server}, []}]),
20802138
timer:sleep(1000),
20812139
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
2082-
?assert(Wal1 == Wal2).
2140+
?assert(Wal1 == Wal2),
2141+
ok = rpc:call(Server, rabbit_alarm, clear_alarm,
2142+
[{{resource_limit, memory, Server}, []}]),
2143+
timer:sleep(1000),
2144+
ok.
20832145

20842146
%%----------------------------------------------------------------------------
20852147

@@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) ->
22192281
fun(N) ->
22202282
case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
22212283
{ok, {_, Msgs}, _} ->
2284+
ct:pal("Msgs ~w", [Msgs]),
22222285
Msgs;
22232286
_ ->
22242287
undefined
@@ -2258,3 +2321,14 @@ delete_queues() ->
22582321

22592322
stop_node(Config, Server) ->
22602323
rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).
2324+
2325+
get_message_bytes(Leader, QRes) ->
2326+
case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of
2327+
[{QRes, Props, _}] ->
2328+
{proplists:get_value(message_bytes, Props),
2329+
proplists:get_value(message_bytes_ready, Props),
2330+
proplists:get_value(message_bytes_unacknowledged, Props)};
2331+
_ ->
2332+
[]
2333+
end.
2334+

0 commit comments

Comments
 (0)