Skip to content

Commit 3f53846

Browse files
committed
Refactor quorum queue periodic metric emission
Primarily to avoid the transient process that is spawned every Ra "tick" to query the quorum queue process for information that could be passed in when the process is spawned. Refactored to make use of the overview map instead of a custom tuple. Also use ets:lookup_element where applicable
1 parent 14c1278 commit 3f53846

File tree

3 files changed

+99
-91
lines changed

3 files changed

+99
-91
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -888,26 +888,14 @@ state_enter0(_, _, Effects) ->
888888
Effects.
889889

890890
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
891-
tick(Ts, #?MODULE{cfg = #cfg{name = Name,
892-
resource = QName},
893-
msg_bytes_enqueue = EnqueueBytes,
894-
msg_bytes_checkout = CheckoutBytes,
895-
dlx = DlxState} = State) ->
891+
tick(Ts, #?MODULE{cfg = #cfg{name = _Name,
892+
resource = QName}} = State) ->
896893
case is_expired(Ts, State) of
897894
true ->
898895
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
899896
false ->
900-
{_, MsgBytesDiscard} = rabbit_fifo_dlx:stat(DlxState),
901-
Metrics = {Name,
902-
messages_ready(State),
903-
num_checked_out(State), % checked out
904-
messages_total(State),
905-
query_consumer_count(State), % Consumers
906-
EnqueueBytes,
907-
CheckoutBytes,
908-
MsgBytesDiscard},
909897
[{mod_call, rabbit_quorum_queue,
910-
handle_tick, [QName, Metrics, all_nodes(State)]}]
898+
handle_tick, [QName, overview(State), all_nodes(State)]}]
911899
end.
912900

913901
-spec overview(state()) -> map().
@@ -918,7 +906,8 @@ overview(#?MODULE{consumers = Cons,
918906
msg_bytes_enqueue = EnqueueBytes,
919907
msg_bytes_checkout = CheckoutBytes,
920908
cfg = Cfg,
921-
dlx = DlxState} = State) ->
909+
dlx = DlxState,
910+
waiting_consumers = WaitingConsumers} = State) ->
922911
Conf = #{name => Cfg#cfg.name,
923912
resource => Cfg#cfg.resource,
924913
release_cursor_interval => Cfg#cfg.release_cursor_interval,
@@ -930,9 +919,18 @@ overview(#?MODULE{consumers = Cons,
930919
msg_ttl => Cfg#cfg.msg_ttl,
931920
delivery_limit => Cfg#cfg.delivery_limit
932921
},
922+
SacOverview = case active_consumer(Cons) of
923+
{SacConsumerId, _} ->
924+
NumWaiting = length(WaitingConsumers),
925+
#{single_active_consumer_id => SacConsumerId,
926+
single_active_num_waiting_consumers => NumWaiting};
927+
_ ->
928+
#{}
929+
end,
933930
Overview = #{type => ?MODULE,
934931
config => Conf,
935-
num_consumers => maps:size(Cons),
932+
num_consumers => map_size(Cons),
933+
num_active_consumers => query_consumer_count(State),
936934
num_checked_out => num_checked_out(State),
937935
num_enqueuers => maps:size(Enqs),
938936
num_ready_messages => messages_ready(State),
@@ -944,9 +942,10 @@ overview(#?MODULE{consumers = Cons,
944942
enqueue_message_bytes => EnqueueBytes,
945943
checkout_message_bytes => CheckoutBytes,
946944
in_memory_message_bytes => 0, %% backwards compat
947-
smallest_raft_index => smallest_raft_index(State)},
945+
smallest_raft_index => smallest_raft_index(State)
946+
},
948947
DlxOverview = rabbit_fifo_dlx:overview(DlxState),
949-
maps:merge(Overview, DlxOverview).
948+
maps:merge(maps:merge(Overview, DlxOverview), SacOverview).
950949

951950
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
952951
[delivery_msg()].

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 65 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -435,30 +435,56 @@ spawn_notify_decorators(QName, Fun, Args) ->
435435
catch notify_decorators(QName, Fun, Args).
436436

437437
handle_tick(QName,
438-
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack, MsgBytesDiscard},
438+
#{config := #{name := Name},
439+
num_active_consumers := NumConsumers,
440+
num_checked_out := NumCheckedOut,
441+
num_ready_messages := NumReadyMsgs,
442+
num_messages := NumMessages,
443+
enqueue_message_bytes := EnqueueBytes,
444+
checkout_message_bytes := CheckoutBytes,
445+
num_discarded := NumDiscarded,
446+
num_discard_checked_out := NumDiscardedCheckedOut,
447+
discard_message_bytes := DiscardBytes,
448+
discard_checkout_message_bytes := DiscardCheckoutBytes,
449+
smallest_raft_index := _} = Overview,
439450
Nodes) ->
440451
%% this makes calls to remote processes so cannot be run inside the
441452
%% ra server
442453
Self = self(),
443454
_ = spawn(
444455
fun() ->
445456
try
446-
R = reductions(Name),
447-
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
448-
Util = case C of
457+
Reductions = reductions(Name),
458+
rabbit_core_metrics:queue_stats(QName, NumReadyMsgs,
459+
NumCheckedOut, NumMessages,
460+
Reductions),
461+
Util = case NumConsumers of
449462
0 -> 0;
450463
_ -> rabbit_fifo:usage(Name)
451464
end,
452-
Infos = [{consumers, C},
465+
Keys = ?STATISTICS_KEYS -- [consumers,
466+
messages_dlx,
467+
message_bytes_dlx,
468+
single_active_consumer_pid,
469+
single_active_consumer_ctag
470+
],
471+
{SacTag, SacPid} = maps:get(single_active_consumer_id,
472+
Overview, {'', ''}),
473+
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
474+
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
475+
Infos = [{consumers, NumConsumers},
453476
{consumer_capacity, Util},
454477
{consumer_utilisation, Util},
455-
{message_bytes_ready, MsgBytesReady},
456-
{message_bytes_unacknowledged, MsgBytesUnack},
457-
{message_bytes, MsgBytesReady + MsgBytesUnack + MsgBytesDiscard},
458-
{message_bytes_persistent, MsgBytesReady + MsgBytesUnack + MsgBytesDiscard},
459-
{messages_persistent, M}
460-
461-
| infos(QName, ?STATISTICS_KEYS -- [consumers])],
478+
{message_bytes_ready, EnqueueBytes},
479+
{message_bytes_unacknowledged, CheckoutBytes},
480+
{message_bytes, MsgBytes},
481+
{message_bytes_persistent, MsgBytes},
482+
{messages_persistent, NumMessages},
483+
{messages_dlx, NumDiscarded + NumDiscardedCheckedOut},
484+
{message_bytes_dlx, MsgBytesDiscarded},
485+
{single_active_consumer_ctag, SacTag},
486+
{single_active_consumer_pid, SacPid}
487+
| infos(QName, Keys)],
462488
rabbit_core_metrics:queue_stats(QName, Infos),
463489
ok = repair_leader_record(QName, Self),
464490
ExpectedNodes = rabbit_nodes:all(),
@@ -884,8 +910,6 @@ deliver(QSs, #delivery{message = #basic_message{content = Content0} = Msg,
884910
state_info(S) ->
885911
#{pending_raft_commands => rabbit_fifo_client:pending_size(S)}.
886912

887-
888-
889913
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
890914
infos(QName) ->
891915
infos(QName, ?STATISTICS_KEYS).
@@ -976,9 +1000,11 @@ cluster_state(Name) ->
9761000
case whereis(Name) of
9771001
undefined -> down;
9781002
_ ->
979-
case ets:lookup(ra_state, Name) of
980-
[{_, recover}] -> recovering;
981-
_ -> running
1003+
case ets_lookup_element(ra_state, Name, 2, undefined) of
1004+
recover ->
1005+
recovering;
1006+
_ ->
1007+
running
9821008
end
9831009
end.
9841010

@@ -1232,7 +1258,8 @@ queue_length(Q) ->
12321258
Name = amqqueue:get_name(Q),
12331259
case ets:lookup(ra_metrics, Name) of
12341260
[] -> 0;
1235-
[{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx
1261+
[{_, _, SnapIdx, _, _, LastIdx, _}] ->
1262+
LastIdx - SnapIdx
12361263
end.
12371264

12381265
get_replicas(Q) ->
@@ -1374,20 +1401,10 @@ i(messages, Q) when ?is_amqqueue(Q) ->
13741401
quorum_messages(QName);
13751402
i(messages_ready, Q) when ?is_amqqueue(Q) ->
13761403
QName = amqqueue:get_name(Q),
1377-
case ets:lookup(queue_coarse_metrics, QName) of
1378-
[{_, MR, _, _, _}] ->
1379-
MR;
1380-
[] ->
1381-
0
1382-
end;
1404+
ets_lookup_element(queue_coarse_metrics, QName, 2, 0);
13831405
i(messages_unacknowledged, Q) when ?is_amqqueue(Q) ->
13841406
QName = amqqueue:get_name(Q),
1385-
case ets:lookup(queue_coarse_metrics, QName) of
1386-
[{_, _, MU, _, _}] ->
1387-
MU;
1388-
[] ->
1389-
0
1390-
end;
1407+
ets_lookup_element(queue_coarse_metrics, QName, 3, 0);
13911408
i(policy, Q) ->
13921409
case rabbit_policy:name(Q) of
13931410
none -> '';
@@ -1405,12 +1422,8 @@ i(effective_policy_definition, Q) ->
14051422
end;
14061423
i(consumers, Q) when ?is_amqqueue(Q) ->
14071424
QName = amqqueue:get_name(Q),
1408-
case ets:lookup(queue_metrics, QName) of
1409-
[{_, M, _}] ->
1410-
proplists:get_value(consumers, M, 0);
1411-
[] ->
1412-
0
1413-
end;
1425+
Consumers = ets_lookup_element(queue_metrics, QName, 2, []),
1426+
proplists:get_value(consumers, Consumers, 0);
14141427
i(memory, Q) when ?is_amqqueue(Q) ->
14151428
{Name, _} = amqqueue:get_pid(Q),
14161429
try
@@ -1429,10 +1442,7 @@ i(state, Q) when ?is_amqqueue(Q) ->
14291442
end;
14301443
i(local_state, Q) when ?is_amqqueue(Q) ->
14311444
{Name, _} = amqqueue:get_pid(Q),
1432-
case ets:lookup(ra_state, Name) of
1433-
[{_, State}] -> State;
1434-
_ -> not_member
1435-
end;
1445+
ets_lookup_element(ra_state, Name, 2, not_member);
14361446
i(garbage_collection, Q) when ?is_amqqueue(Q) ->
14371447
{Name, _} = amqqueue:get_pid(Q),
14381448
try
@@ -1477,27 +1487,9 @@ i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
14771487
end;
14781488
i(type, _) -> quorum;
14791489
i(messages_ram, Q) when ?is_amqqueue(Q) ->
1480-
QPid = amqqueue:get_pid(Q),
1481-
case ra:local_query(QPid,
1482-
fun rabbit_fifo:query_in_memory_usage/1) of
1483-
{ok, {_, {Length, _}}, _} ->
1484-
Length;
1485-
{error, _} ->
1486-
0;
1487-
{timeout, _} ->
1488-
0
1489-
end;
1490+
0;
14901491
i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
1491-
QPid = amqqueue:get_pid(Q),
1492-
case ra:local_query(QPid,
1493-
fun rabbit_fifo:query_in_memory_usage/1) of
1494-
{ok, {_, {_, Bytes}}, _} ->
1495-
Bytes;
1496-
{error, _} ->
1497-
0;
1498-
{timeout, _} ->
1499-
0
1500-
end;
1492+
0;
15011493
i(messages_dlx, Q) when ?is_amqqueue(Q) ->
15021494
QPid = amqqueue:get_pid(Q),
15031495
case ra:local_query(QPid,
@@ -1524,11 +1516,10 @@ i(_K, _Q) -> ''.
15241516

15251517
open_files(Name) ->
15261518
case whereis(Name) of
1527-
undefined -> {node(), 0};
1528-
Pid -> case ets:lookup(ra_open_file_metrics, Pid) of
1529-
[] -> {node(), 0};
1530-
[{_, Count}] -> {node(), Count}
1531-
end
1519+
undefined ->
1520+
{node(), 0};
1521+
Pid ->
1522+
{node(), ets_lookup_element(ra_open_file_metrics, Pid, 2, 0)}
15321523
end.
15331524

15341525
leader(Q) when ?is_amqqueue(Q) ->
@@ -1582,12 +1573,7 @@ is_process_alive(Name, Node) ->
15821573
-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().
15831574

15841575
quorum_messages(QName) ->
1585-
case ets:lookup(queue_coarse_metrics, QName) of
1586-
[{_, _, _, M, _}] ->
1587-
M;
1588-
[] ->
1589-
0
1590-
end.
1576+
ets_lookup_element(queue_coarse_metrics, QName, 4, 0).
15911577

15921578
quorum_ctag(Int) when is_integer(Int) ->
15931579
integer_to_binary(Int);
@@ -1712,3 +1698,11 @@ erpc_timeout(Node, _)
17121698
infinity;
17131699
erpc_timeout(_, Timeout) ->
17141700
Timeout.
1701+
1702+
ets_lookup_element(Tbl, Key, Pos, Default) ->
1703+
try ets:lookup_element(Tbl, Key, Pos) of
1704+
V -> V
1705+
catch
1706+
_:badarg ->
1707+
Default
1708+
end.

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,14 @@ tick_test(C) ->
610610

611611
[{mod_call, rabbit_quorum_queue, handle_tick,
612612
[#resource{},
613-
{?FUNCTION_NAME, 1, 1, 2, 1, 3, 3, 0},
613+
#{config := #{name := ?FUNCTION_NAME},
614+
num_consumers := 1,
615+
num_checked_out := 1,
616+
num_ready_messages := 1,
617+
num_messages := 2,
618+
enqueue_message_bytes := 3,
619+
checkout_message_bytes := 3,
620+
num_discarded := _Discards},
614621
[_Node]
615622
]}] = rabbit_fifo:tick(1, S4),
616623
ok.
@@ -773,6 +780,7 @@ single_active_consumer_basic_get_test(C) ->
773780
{_State, {error, {unsupported, single_active_consumer}}} =
774781
apply(meta(C, 2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
775782
State1),
783+
776784
ok.
777785

778786
single_active_consumer_revive_test(C) ->
@@ -861,6 +869,10 @@ single_active_consumer_test(C) ->
861869
% the first registered consumer is the active one, the others are waiting
862870
?assertEqual(1, map_size(State1#rabbit_fifo.consumers)),
863871
?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers),
872+
873+
?assertMatch(#{single_active_consumer_id := C1,
874+
single_active_num_waiting_consumers := 3},
875+
rabbit_fifo:overview(State1)),
864876
?assertEqual(3, length(rabbit_fifo:query_waiting_consumers(State1))),
865877
?assertNotEqual(false, lists:keyfind(C2, 1, rabbit_fifo:query_waiting_consumers(State1))),
866878
?assertNotEqual(false, lists:keyfind(C3, 1, rabbit_fifo:query_waiting_consumers(State1))),
@@ -874,6 +886,9 @@ single_active_consumer_test(C) ->
874886
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
875887
?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers),
876888
% the cancelled consumer has been removed from waiting consumers
889+
?assertMatch(#{single_active_consumer_id := C1,
890+
single_active_num_waiting_consumers := 2},
891+
rabbit_fifo:overview(State2)),
877892
?assertEqual(2, length(rabbit_fifo:query_waiting_consumers(State2))),
878893
?assertNotEqual(false, lists:keyfind(C2, 1, rabbit_fifo:query_waiting_consumers(State2))),
879894
?assertNotEqual(false, lists:keyfind(C4, 1, rabbit_fifo:query_waiting_consumers(State2))),

0 commit comments

Comments
 (0)