Skip to content

Report message bytes in quorum queue stats #1796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 81 additions & 22 deletions src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
-compile(inline).

-include_lib("ra/include/ra.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([
init/1,
Expand All @@ -42,6 +43,8 @@
query_consumers/1,
usage/1,

zero/1,

%% misc
dehydrate_state/1,

Expand Down Expand Up @@ -226,7 +229,9 @@
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
PrefixMsgs :: non_neg_integer()}
PrefixMsgs :: non_neg_integer()},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()
}).

-opaque state() :: #state{}.
Expand Down Expand Up @@ -266,6 +271,9 @@ update_state(Conf, State) ->
become_leader_handler = BLH,
shadow_copy_interval = SHI}.

zero(_) ->
0.

% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
-spec apply(ra_machine:command_meta_data(), command(),
Expand All @@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
msg = RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects1} ->
{State, Effects, ok} = checkout(State0, Effects1),
{State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
Effects1),
{append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
Expand Down Expand Up @@ -543,26 +552,35 @@ tick(_Ts, #state{name = Name,
queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
consumers = Cons} = State) ->
consumers = Cons,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
maps:size(Cons)}, % Consumers
maps:size(Cons), % Consumers
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
update_metrics, [QName, Metrics]}, {aux, emit}].

-spec overview(state()) -> map().
overview(#state{consumers = Cons,
enqueuers = Enqs,
messages = Messages,
ra_indexes = Indexes} = State) ->
ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes
} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => maps:size(Messages),
num_messages => rabbit_fifo_index:size(Indexes)}.
num_messages => rabbit_fifo_index:size(Indexes),
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.

-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
Expand Down Expand Up @@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
add_bytes_settle(RawMsg, Acc);
(_, Acc) ->
Acc
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
{State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
Con0, Checked, Effects0, State0),
{State, Effects, _} = checkout(State1, Effects1),
Con0, Checked, Effects0, State1),
{State, Effects, _} = checkout(State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).

Expand Down Expand Up @@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}.
add_bytes_return(RawMsg,
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}).

return_all(State, Checked) ->
maps:fold(fun (_, '$prefix_msg', S) ->
Expand Down Expand Up @@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State = State0#state{service_queue = SQ,
messages = Messages,
consumers = Cons},
Msg = case ConsumerMsg of
'$prefix_msg' -> '$prefix_msg';
{_, {_, M}} -> M
end,
State1 = State0#state{service_queue = SQ,
messages = Messages,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
'$prefix_msg' ->
{State1, '$prefix_msg'};
{_, {_, {_, RawMsg} = M}} ->
{add_bytes_checkout(RawMsg, State1), M}
end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
Expand Down Expand Up @@ -1147,6 +1174,35 @@ make_purge() -> #purge{}.
make_update_state(Config) ->
#update_state{config = Config}.

add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_enqueue = Enqueue + Bytes}.

add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.

add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_checkout = Checkout - Bytes}.

add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
State#state{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.

message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand All @@ -1170,7 +1226,8 @@ make_update_state(Config) ->

test_init(Name) ->
init(#{name => Name,
queue_resource => queue_resource,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(Name, utf8)),
shadow_copy_interval => 0}).

enq_enq_checkout_test() ->
Expand Down Expand Up @@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
tick_test() ->
Cid = {<<"c">>, self()},
Cid2 = {<<"c2">>, self()},
{S0, _} = enq(1, 1, fst, test_init(test)),
{S1, _} = enq(2, 2, snd, S0),
{S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
{S1, _} = enq(2, 2, <<"snd">>, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),

[{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
[{mod_call, _, _,
[#resource{},
{?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
ok.

enq_deq_snapshot_recover_test() ->
Expand Down
10 changes: 6 additions & 4 deletions src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
metrics_handler => {?MODULE, update_metrics, [QName]}}.
become_leader_handler => {?MODULE, become_leader, [QName]}}.

cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
Expand Down Expand Up @@ -198,14 +197,17 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.

update_metrics(QName, {Name, MR, MU, M, C}) ->
update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
R = reductions(Name),
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
Util = case C of
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
Infos = [{consumers, C}, {consumer_utilisation, Util} | infos(QName)],
Infos = [{consumers, C}, {consumer_utilisation, Util},
{message_bytes_ready, MsgBytesReady},
{message_bytes_unacknowledged, MsgBytesUnack},
{message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, M},
Expand Down
82 changes: 78 additions & 4 deletions test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ all_tests() ->
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
memory_alarm_rolls_wal
].

Expand Down Expand Up @@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) ->
requeue = true}),
%% wait for requeueing
timer:sleep(500),

{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
Expand All @@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) ->
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
multiple = false,
requeue = true}).
requeue = true}),
ok.

memory_alarm_rolls_wal(Config) ->
message_bytes_metrics(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

RaName = ra_name(QQ),
{ok, _, {_, Leader}} = ra:members({RaName, Server}),
QRes = rabbit_misc:r(<<"/">>, queue, QQ),

publish(Ch, QQ),

wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
wait_until(fun() ->
{3, 3, 0} == get_message_bytes(Leader, QRes)
end),

subscribe(Ch, QQ, false),

wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
wait_until(fun() ->
{3, 0, 3} == get_message_bytes(Leader, QRes)
end),

receive
{#'basic.deliver'{delivery_tag = DeliveryTag,
redelivered = false}, _} ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0),
wait_until(fun() ->
{0, 0, 0} == get_message_bytes(Leader, QRes)
end)
end,

%% Let's publish and then close the consumer channel. Messages must be
%% returned to the queue
publish(Ch, QQ),

wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
wait_until(fun() ->
{3, 0, 3} == get_message_bytes(Leader, QRes)
end),

rabbit_ct_client_helpers:close_channel(Ch),

wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
wait_until(fun() ->
{3, 3, 0} == get_message_bytes(Leader, QRes)
end),
ok.

memory_alarm_rolls_wal(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
ok = rpc:call(Server, rabbit_alarm, set_alarm,
Expand All @@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) ->
[{{resource_limit, memory, Server}, []}]),
timer:sleep(1000),
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
?assert(Wal1 == Wal2).
?assert(Wal1 == Wal2),
ok = rpc:call(Server, rabbit_alarm, clear_alarm,
[{{resource_limit, memory, Server}, []}]),
timer:sleep(1000),
ok.

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) ->
fun(N) ->
case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
{ok, {_, Msgs}, _} ->
ct:pal("Msgs ~w", [Msgs]),
Msgs;
_ ->
undefined
Expand Down Expand Up @@ -2258,3 +2321,14 @@ delete_queues() ->

stop_node(Config, Server) ->
rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).

get_message_bytes(Leader, QRes) ->
case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of
[{QRes, Props, _}] ->
{proplists:get_value(message_bytes, Props),
proplists:get_value(message_bytes_ready, Props),
proplists:get_value(message_bytes_unacknowledged, Props)};
_ ->
[]
end.