Skip to content

Commit cb8de07

Browse files
Merge branch 'stable'
Conflicts: src/rabbit_channel.erl src/rabbit_control_main.erl
2 parents e9d1588 + 2545c1b commit cb8de07

File tree

1 file changed

+42
-30
lines changed

1 file changed

+42
-30
lines changed

src/rabbit_channel.erl

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,21 @@
183183

184184
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
185185

186-
-define(INCR_STATS(Incs, Measure, State),
186+
-define(INCR_STATS(Type, Key, Inc, Measure, State),
187187
case rabbit_event:stats_level(State, #ch.stats_timer) of
188-
fine -> incr_stats(Incs, Measure);
189-
_ -> ok
188+
fine ->
189+
rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
190+
%% Keys in the process dictionary are used to clean up the core metrics
191+
put({Type, Key}, none);
192+
_ ->
193+
ok
194+
end).
195+
196+
-define(INCR_STATS(Type, Key, Inc, Measure),
197+
begin
198+
rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
199+
%% Keys in the process dictionary are used to clean up the core metrics
200+
put({Type, Key}, none)
190201
end).
191202

192203
%%----------------------------------------------------------------------------
@@ -1611,7 +1622,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
16111622
content = Content},
16121623
State = #ch{protocol = Protocol, writer_pid = WriterPid},
16131624
Reason) ->
1614-
?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
1625+
?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State),
16151626
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
16161627
ok = rabbit_writer:send_command(
16171628
WriterPid,
@@ -1648,14 +1659,14 @@ record_sent(ConsumerTag, AckRequired,
16481659
user = #user{username = Username},
16491660
conn_name = ConnName,
16501661
channel = ChannelNum}) ->
1651-
?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
1652-
{none, true} -> get;
1653-
{none, false} -> get_no_ack;
1654-
{_ , true} -> deliver;
1655-
{_ , false} -> deliver_no_ack
1656-
end, State),
1662+
?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of
1663+
{none, true} -> get;
1664+
{none, false} -> get_no_ack;
1665+
{_ , true} -> deliver;
1666+
{_ , false} -> deliver_no_ack
1667+
end, State),
16571668
case Redelivered of
1658-
true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
1669+
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
16591670
false -> ok
16601671
end,
16611672
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
@@ -1700,11 +1711,11 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
17001711
foreach_per_queue(
17011712
fun (QPid, MsgIds) ->
17021713
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
1703-
?INCR_STATS(case maps:find(QPid, QNames) of
1704-
{ok, QName} -> Count = length(MsgIds),
1705-
[{queue_stats, QName, Count}];
1706-
error -> []
1707-
end, ack, State)
1714+
case maps:find(QPid, QNames) of
1715+
{ok, QName} -> Count = length(MsgIds),
1716+
?INCR_STATS(queue_stats, QName, Count, ack, State);
1717+
error -> ok
1718+
end
17081719
end, Acked),
17091720
ok = notify_limiter(State#ch.limiter, Acked).
17101721

@@ -1769,7 +1780,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
17691780
confirm = false,
17701781
mandatory = false},
17711782
[]}, State) -> %% optimisation
1772-
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
1783+
?INCR_STATS(exchange_stats, XName, 1, publish, State),
17731784
State;
17741785
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
17751786
exchange_name = XName},
@@ -1806,11 +1817,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
18061817
Message, State1),
18071818
State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
18081819
XName, State2),
1809-
?INCR_STATS([{exchange_stats, XName, 1} |
1810-
[{queue_exchange_stats, {QName, XName}, 1} ||
1811-
QPid <- DeliveredQPids,
1812-
{ok, QName} <- [maps:find(QPid, QNames1)]]],
1813-
publish, State3),
1820+
case rabbit_event:stats_level(State3, #ch.stats_timer) of
1821+
fine ->
1822+
?INCR_STATS(exchange_stats, XName, 1, publish),
1823+
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
1824+
QPid <- DeliveredQPids,
1825+
{ok, QName} <- [maps:find(QPid, QNames1)]];
1826+
_ ->
1827+
ok
1828+
end,
18141829
State3.
18151830

18161831
process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
@@ -1853,7 +1868,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) ->
18531868
ok -> MsgSeqNos =
18541869
lists:foldl(
18551870
fun ({MsgSeqNo, XName}, MSNs) ->
1856-
?INCR_STATS([{exchange_stats, XName, 1}],
1871+
?INCR_STATS(exchange_stats, XName, 1,
18571872
confirm, State),
18581873
[MsgSeqNo | MSNs]
18591874
end, [], lists:append(C)),
@@ -1953,17 +1968,14 @@ i(Item, _) ->
19531968
name(#ch{conn_name = ConnName, channel = Channel}) ->
19541969
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
19551970

1956-
incr_stats(Incs, Measure) ->
1957-
[begin
1958-
rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
1959-
%% Keys in the process dictionary are used to clean up the core metrics
1960-
put({Type, Key}, none)
1961-
end || {Type, Key, Inc} <- Incs].
1962-
19631971
emit_stats(State) -> emit_stats(State, []).
19641972

19651973
emit_stats(State, Extra) ->
19661974
[{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State),
1975+
%% First metric must be `idle_since` (if available), as expected by
1976+
%% `rabbit_mgmt_format:format_channel_stats`. This is a performance
1977+
%% optimisation that avoids traversing the whole list when only
1978+
%% one element has to be formatted.
19671979
rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0),
19681980
rabbit_core_metrics:channel_stats(reductions, self(), Red).
19691981

0 commit comments

Comments
 (0)