Skip to content

Commit 7031a0e

Browse files
committed
WIP global counters
TODOs: * rabbitmq_global_publishers * rabbitmq_global_consumers * tests
1 parent 48793d5 commit 7031a0e

File tree

2 files changed

+54
-19
lines changed

2 files changed

+54
-19
lines changed

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,15 @@
131131
]).
132132

133133
boot_step() ->
134-
%% Protocol counters
135-
init([{protocol, amqp091}]),
136-
137-
%% Protocol & Queue Type counters
138-
init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]),
139-
init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]),
140-
init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]),
134+
[begin
135+
%% Protocol counters
136+
init([{protocol, Proto}]),
137+
138+
%% Protocol & Queue Type counters
139+
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
140+
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
141+
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
142+
end || Proto <- [amqp091, amqp10]],
141143

142144
%% Dead Letter counters
143145
%%

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-include_lib("amqp10_common/include/amqp10_types.hrl").
1414
-include("rabbit_amqp1_0.hrl").
1515

16+
-define(PROTOCOL, amqp10).
1617
-define(HIBERNATE_AFTER, 6_000).
1718
-define(CREDIT_REPLY_TIMEOUT, 30_000).
1819
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
@@ -89,7 +90,8 @@
8990
-record(outgoing_link, {
9091
%% Although the source address of a link might be an exchange name and binding key
9192
%% or a topic filter, an outgoing link will always consume from a queue.
92-
queue :: rabbit_misc:resource_name(),
93+
queue_name :: rabbit_misc:resource_name(),
94+
queue_type :: rabbit_queue_type:queue_type(),
9395
send_settled :: boolean(),
9496
max_message_size :: unlimited | pos_integer(),
9597
%% When credit API v1 is used, our session process holds the delivery-count
@@ -435,6 +437,7 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
435437
send_dispositions(ReleasedIds, #'v1_0.released'{}, Writer, ChannelNum),
436438
AcceptedIds = AcceptedIds1 ++ AcceptedIds0,
437439
send_dispositions(AcceptedIds, #'v1_0.accepted'{}, Writer, ChannelNum),
440+
rabbit_global_counters:messages_confirmed(?PROTOCOL, length(AcceptedIds)),
438441
%% Send DETACH frames after DISPOSITION frames such that
439442
%% clients can handle DISPOSITIONs before closing their links.
440443
lists:foreach(fun(Frame) ->
@@ -624,7 +627,7 @@ destroy_incoming_link(Handle, #incoming_link{queue = QNameBin}, QNameBin, {Frame
624627
destroy_incoming_link(_, _, _, Acc) ->
625628
Acc.
626629

627-
destroy_outgoing_link(Handle, #outgoing_link{queue = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) ->
630+
destroy_outgoing_link(Handle, #outgoing_link{queue_name = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) ->
628631
{Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0),
629632
{[detach(Handle, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames],
630633
Unsettled,
@@ -744,6 +747,7 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
744747
case rabbit_amqqueue:with(
745748
QName,
746749
fun(Q) ->
750+
QType = amqqueue:get_type(Q),
747751
%% Whether credit API v1 or v2 is used is decided only here at link attachment time.
748752
%% This decision applies to the whole life time of the link.
749753
%% This means even when feature flag credit_api_v2 will be enabled later, this consumer will
@@ -756,7 +760,7 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
756760
%% flow control state. Hence, credit API mixed version isn't an issue for streams.
757761
{Mode,
758762
DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse
759-
amqqueue:get_type(Q) =:= rabbit_stream_queue of
763+
QType =:= rabbit_stream_queue of
760764
true ->
761765
{{credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2};
762766
false ->
@@ -799,7 +803,8 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
799803
%% maximum size imposed by the link endpoint."
800804
unlimited
801805
end,
802-
Link = #outgoing_link{queue = QNameBin,
806+
Link = #outgoing_link{queue_name = QNameBin,
807+
queue_type = QType,
803808
send_settled = SndSettled,
804809
max_message_size = MaxMessageSize,
805810
delivery_count = DeliveryCount},
@@ -914,7 +919,7 @@ handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
914919
%% TODO keep the state around depending on the lifetime
915920
{QStates, Unsettled, OutgoingLinks}
916921
= case maps:take(HandleInt, OutgoingLinks0) of
917-
{#outgoing_link{queue = QNameBin}, OutgoingLinks1} ->
922+
{#outgoing_link{queue_name = QNameBin}, OutgoingLinks1} ->
918923
QName = rabbit_misc:r(Vhost, queue, QNameBin),
919924
case rabbit_amqqueue:lookup(QName) of
920925
{ok, Q} ->
@@ -1041,6 +1046,7 @@ handle_control(#'v1_0.disposition'{role = ?RECV_ROLE,
10411046
fun({QName, Ctag}, MsgIds, {QS0, ActionsAcc}) ->
10421047
case rabbit_queue_type_settle(QName, SettleOp, Ctag, MsgIds, QS0) of
10431048
{ok, QS, Actions0} ->
1049+
messages_acknowledged(SettleOp, QName, QS, MsgIds),
10441050
{QS, ActionsAcc ++ Actions0};
10451051
{protocol_error, _ErrorType, Reason, ReasonArgs} ->
10461052
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
@@ -1377,7 +1383,8 @@ handle_deliver(ConsumerTag, AckRequired,
13771383
cfg = #cfg{outgoing_max_frame_size = MaxFrameSize}}) ->
13781384
Handle = ctag_to_handle(ConsumerTag),
13791385
case OutgoingLinks of
1380-
#{Handle := #outgoing_link{send_settled = SendSettled,
1386+
#{Handle := #outgoing_link{queue_type = QType,
1387+
send_settled = SendSettled,
13811388
max_message_size = MaxMessageSize}} ->
13821389
%% "The delivery-tag MUST be unique amongst all deliveries that could be
13831390
%% considered unsettled by either end of the link." [2.6.12]
@@ -1418,6 +1425,7 @@ handle_deliver(ConsumerTag, AckRequired,
14181425
TLen = iolist_size(amqp10_framing:encode_bin(Transfer)),
14191426
encode_frames(Transfer, Sections, MaxFrameSize - TLen, [])
14201427
end,
1428+
messages_delivered(Redelivered, QType),
14211429
Del = #outgoing_unsettled{
14221430
msg_id = MsgId,
14231431
consumer_tag = ConsumerTag,
@@ -1533,7 +1541,7 @@ incoming_link_transfer(
15331541
RoutingKeys = mc:get_annotation(routing_keys, Mc),
15341542
RoutingKey = routing_key(RoutingKeys, XName),
15351543
% Mc1 = rabbit_message_interceptor:intercept(Mc),
1536-
% rabbit_global_counters:messages_received(ProtoVer, 1),
1544+
messages_received(Settled),
15371545
case rabbit_exchange:lookup(XName) of
15381546
{ok, Exchange} ->
15391547
check_write_permitted_on_topic(Exchange, User, RoutingKey),
@@ -1551,7 +1559,6 @@ incoming_link_transfer(
15511559
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
15521560
{ok, QStates, Actions} ->
15531561
State1 = State0#state{queue_states = QStates},
1554-
% rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
15551562
%% Confirms must be registered before processing actions
15561563
%% because actions may contain rejections of publishes.
15571564
{U, Reply0} = process_routing_confirm(
@@ -1582,16 +1589,18 @@ incoming_link_transfer(
15821589
end.
15831590

15841591
process_routing_confirm([], _SenderSettles = true, _, U) ->
1585-
% rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
1592+
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
15861593
{U, []};
15871594
process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
1588-
% rabbit_global_counters:messages_unroutable_returned(ProtoVer, 1),
1595+
rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1),
15891596
Disposition = released(DeliveryId),
15901597
{U, [Disposition]};
15911598
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
15921599
QNames = rabbit_amqqueue:queue_names(Qs),
15931600
false = maps:is_key(DeliveryId, U0),
1594-
U = U0#{DeliveryId => {maps:from_keys(QNames, ok), SenderSettles, false}},
1601+
Map = maps:from_keys(QNames, ok),
1602+
U = U0#{DeliveryId => {Map, SenderSettles, false}},
1603+
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),
15951604
{U, []}.
15961605

15971606
released(DeliveryId) ->
@@ -1656,7 +1665,7 @@ ensure_target(#'v1_0.target'{address = Address,
16561665
end.
16571666

16581667
handle_outgoing_link_flow_control(
1659-
#outgoing_link{queue = QNameBin,
1668+
#outgoing_link{queue_name = QNameBin,
16601669
delivery_count = MaybeDeliveryCountSnd},
16611670
#'v1_0.flow'{handle = ?UINT(HandleInt),
16621671
delivery_count = MaybeDeliveryCountRcv,
@@ -2065,6 +2074,30 @@ routing_key(undefined, XName) ->
20652074
routing_key([RoutingKey], _XName) ->
20662075
RoutingKey.
20672076

2077+
messages_received(Settled) ->
2078+
rabbit_global_counters:messages_received(?PROTOCOL, 1),
2079+
case Settled of
2080+
true -> ok;
2081+
false -> rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1)
2082+
end.
2083+
2084+
messages_delivered(Redelivered, QueueType) ->
2085+
rabbit_global_counters:messages_delivered(?PROTOCOL, QueueType, 1),
2086+
case Redelivered of
2087+
true -> rabbit_global_counters:messages_redelivered(?PROTOCOL, QueueType, 1);
2088+
false -> ok
2089+
end.
2090+
2091+
messages_acknowledged(complete, QName, QS, MsgIds) ->
2092+
case rabbit_queue_type:module(QName, QS) of
2093+
{ok, QType} ->
2094+
rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds));
2095+
_ ->
2096+
ok
2097+
end;
2098+
messages_acknowledged(_, _, _, _) ->
2099+
ok.
2100+
20682101
check_internal_exchange(#exchange{internal = true,
20692102
name = XName}) ->
20702103
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,

0 commit comments

Comments
 (0)