Skip to content

Commit a91a4fa

Browse files
authored
Merge pull request #1762 from rabbitmq/refactor-consumer-tag
Refactor consumer tag in record_sent
2 parents 84fb288 + 4ee1117 commit a91a4fa

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

src/rabbit_channel.erl

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,28 +1811,27 @@ internal_reject(Requeue, Acked, Limiter,
18111811
ok = notify_limiter(Limiter, Acked),
18121812
State#ch{queue_states = QueueStates}.
18131813

1814-
record_sent(ConsumerTag, AckRequired,
1814+
record_sent(Type, Tag, AckRequired,
18151815
Msg = {QName, QPid, MsgId, Redelivered, _Message},
18161816
State = #ch{unacked_message_q = UAMQ,
18171817
next_tag = DeliveryTag,
18181818
trace_state = TraceState,
18191819
user = #user{username = Username},
18201820
conn_name = ConnName,
18211821
channel = ChannelNum}) ->
1822-
?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of
1823-
{_, true} when is_integer(ConsumerTag) -> get;
1824-
{_, false} when is_integer(ConsumerTag) -> get_no_ack;
1825-
%% Authentic consumer tag, this is a delivery
1826-
{_ , true} -> deliver;
1827-
{_ , false} -> deliver_no_ack
1822+
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
1823+
{get, true} -> get;
1824+
{get, false} -> get_no_ack;
1825+
{deliver, true} -> deliver;
1826+
{deliver, false} -> deliver_no_ack
18281827
end, State),
18291828
case Redelivered of
18301829
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
18311830
false -> ok
18321831
end,
18331832
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
18341833
UAMQ1 = case AckRequired of
1835-
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
1834+
true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
18361835
UAMQ);
18371836
false -> UAMQ
18381837
end,
@@ -2457,7 +2456,7 @@ handle_deliver(ConsumerTag, AckRequired,
24572456
ok = rabbit_writer:send_command(WriterPid, Deliver, Content)
24582457
end,
24592458
rabbit_basic:maybe_gc_large_msg(Content),
2460-
record_sent(ConsumerTag, AckRequired, Msg, State).
2459+
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
24612460

24622461
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
24632462
Msg = {QName, QPid, _MsgId, Redelivered,
@@ -2473,7 +2472,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
24732472
message_count = MessageCount},
24742473
Content),
24752474
State1 = track_delivering_queue(NoAck, QPid, QName, State),
2476-
{noreply, record_sent(DeliveryTag, not(NoAck), Msg, State1)}.
2475+
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
24772476

24782477
init_queue_cleanup_timer(State) ->
24792478
{ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),

0 commit comments

Comments
 (0)