Skip to content

Commit 58ac469

Browse files
committed
* Move code for handling ack, nack, and return into Channel.PublisherConfirms.cs
1 parent 64fae9a commit 58ac469

File tree

2 files changed

+77
-73
lines changed

2 files changed

+77
-73
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System.Buffers.Binary;
3233
using System.Collections.Concurrent;
34+
using System.Collections.Generic;
3335
using System.Diagnostics;
36+
using System.Runtime.CompilerServices;
3437
using System.Threading;
3538
using System.Threading.Tasks;
39+
using RabbitMQ.Client.Events;
40+
using RabbitMQ.Client.Exceptions;
3641
using RabbitMQ.Client.Framing;
3742

3843
namespace RabbitMQ.Client.Impl
@@ -92,5 +97,74 @@ await ModelSendAsync(in method, k.CancellationToken)
9297
}
9398
}
9499
}
100+
101+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
102+
private void HandleAck(ulong deliveryTag, bool multiple)
103+
{
104+
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
105+
{
106+
if (multiple)
107+
{
108+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
109+
{
110+
if (pair.Key <= deliveryTag)
111+
{
112+
pair.Value.SetResult(true);
113+
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
114+
}
115+
}
116+
}
117+
else
118+
{
119+
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
120+
{
121+
tcs.SetResult(true);
122+
}
123+
}
124+
}
125+
}
126+
127+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
128+
private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
129+
{
130+
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
131+
{
132+
if (multiple)
133+
{
134+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
135+
{
136+
if (pair.Key <= deliveryTag)
137+
{
138+
pair.Value.SetException(new PublishException(pair.Key, isReturn));
139+
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
140+
}
141+
}
142+
}
143+
else
144+
{
145+
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
146+
{
147+
tcs.SetException(new PublishException(deliveryTag, isReturn));
148+
}
149+
}
150+
}
151+
}
152+
153+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
154+
private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
155+
{
156+
if (_publisherConfirmationsEnabled)
157+
{
158+
ulong publishSequenceNumber = 0;
159+
IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties;
160+
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
161+
if (maybeSeqNum != null)
162+
{
163+
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
164+
}
165+
166+
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);
167+
}
168+
}
95169
}
96170
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers.Binary;
3433
using System.Collections.Generic;
3534
using System.Diagnostics;
3635
using System.Diagnostics.CodeAnalysis;
@@ -602,8 +601,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
602601
.ConfigureAwait(false);
603602
}
604603

605-
await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken)
606-
.ConfigureAwait(false);
604+
HandleAck(ack._deliveryTag, ack._multiple);
607605

608606
return true;
609607
}
@@ -620,8 +618,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
620618
.ConfigureAwait(false);
621619
}
622620

623-
await HandleNack(nack._deliveryTag, nack._multiple, false, cancellationToken)
624-
.ConfigureAwait(false);
621+
HandleNack(nack._deliveryTag, nack._multiple, false);
625622

626623
return true;
627624
}
@@ -640,19 +637,7 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
640637
.ConfigureAwait(false);
641638
}
642639

643-
if (_publisherConfirmationsEnabled)
644-
{
645-
ulong publishSequenceNumber = 0;
646-
IReadOnlyBasicProperties props = e.BasicProperties;
647-
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
648-
if (maybeSeqNum != null)
649-
{
650-
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
651-
}
652-
653-
await HandleNack(publishSequenceNumber, multiple: false, isReturn: true, cancellationToken)
654-
.ConfigureAwait(false);
655-
}
640+
HandleReturn(e);
656641

657642
return true;
658643
}
@@ -1716,61 +1701,6 @@ await ModelSendAsync(in method, k.CancellationToken)
17161701
}
17171702
}
17181703

1719-
private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
1720-
{
1721-
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
1722-
{
1723-
if (multiple)
1724-
{
1725-
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
1726-
{
1727-
if (pair.Key <= deliveryTag)
1728-
{
1729-
pair.Value.SetResult(true);
1730-
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
1731-
}
1732-
}
1733-
}
1734-
else
1735-
{
1736-
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
1737-
{
1738-
tcs.SetResult(true);
1739-
}
1740-
}
1741-
}
1742-
1743-
return Task.CompletedTask;
1744-
}
1745-
1746-
private Task HandleNack(ulong deliveryTag, bool multiple, bool isReturn,
1747-
CancellationToken cancellationToken = default)
1748-
{
1749-
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
1750-
{
1751-
if (multiple)
1752-
{
1753-
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
1754-
{
1755-
if (pair.Key <= deliveryTag)
1756-
{
1757-
pair.Value.SetException(new PublishException(pair.Key, isReturn));
1758-
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
1759-
}
1760-
}
1761-
}
1762-
else
1763-
{
1764-
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
1765-
{
1766-
tcs.SetException(new PublishException(deliveryTag, isReturn));
1767-
}
1768-
}
1769-
}
1770-
1771-
return Task.CompletedTask;
1772-
}
1773-
17741704
private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
17751705
Activity? sendActivity, ulong publishSequenceNumber)
17761706
where TProperties : IReadOnlyBasicProperties, IAmqpHeader

0 commit comments

Comments
 (0)