31
31
32
32
using System ;
33
33
using System . Buffers . Binary ;
34
+ using System . Collections . Concurrent ;
34
35
using System . Collections . Generic ;
35
36
using System . Diagnostics ;
36
37
using System . Diagnostics . CodeAnalysis ;
43
44
using RabbitMQ . Client . Events ;
44
45
using RabbitMQ . Client . Exceptions ;
45
46
using RabbitMQ . Client . Framing ;
47
+ using RabbitMQ . Client . Util ;
46
48
47
49
namespace RabbitMQ . Client . Impl
48
50
{
@@ -62,10 +64,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
62
64
private bool _publisherConfirmationTrackingEnabled = false ;
63
65
private ulong _nextPublishSeqNo = 0 ;
64
66
private readonly SemaphoreSlim _confirmSemaphore = new ( 1 , 1 ) ;
65
- private readonly LinkedList < ulong > _pendingDeliveryTags = new ( ) ;
66
- private readonly Dictionary < ulong , TaskCompletionSource < bool > > _confirmsTaskCompletionSources = new ( ) ;
67
-
68
- private bool _onlyAcksReceived = true ;
67
+ private readonly ConcurrentDictionary < ulong , TaskCompletionSource < bool > > _confirmsTaskCompletionSources = new ( ) ;
69
68
70
69
private ShutdownEventArgs ? _closeReason ;
71
70
public ShutdownEventArgs ? CloseReason => Volatile . Read ( ref _closeReason ) ;
@@ -505,7 +504,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
505
504
. ConfigureAwait ( false ) ;
506
505
try
507
506
{
508
- if ( _confirmsTaskCompletionSources ? . Count > 0 )
507
+ if ( ! _confirmsTaskCompletionSources . IsEmpty )
509
508
{
510
509
var exception = new AlreadyClosedException ( reason ) ;
511
510
foreach ( TaskCompletionSource < bool > confirmsTaskCompletionSource in _confirmsTaskCompletionSources . Values )
@@ -615,7 +614,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
615
614
. ConfigureAwait ( false ) ;
616
615
}
617
616
618
- await HandleAckNack ( ack . _deliveryTag , ack . _multiple , false , cancellationToken )
617
+ await HandleAck ( ack . _deliveryTag , ack . _multiple , cancellationToken )
619
618
. ConfigureAwait ( false ) ;
620
619
621
620
return true ;
@@ -633,7 +632,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
633
632
. ConfigureAwait ( false ) ;
634
633
}
635
634
636
- await HandleAckNack ( nack . _deliveryTag , nack . _multiple , true , cancellationToken )
635
+ await HandleNack ( nack . _deliveryTag , nack . _multiple , cancellationToken )
637
636
. ConfigureAwait ( false ) ;
638
637
639
638
return true ;
@@ -657,20 +656,14 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
657
656
{
658
657
ulong publishSequenceNumber = 0 ;
659
658
IReadOnlyBasicProperties props = e . BasicProperties ;
660
- if ( props . Headers is not null )
659
+ object ? maybeSeqNum = props . Headers ? [ Constants . PublishSequenceNumberHeader ] ;
660
+ if ( maybeSeqNum != null )
661
661
{
662
- object ? maybeSeqNum = props . Headers [ Constants . PublishSequenceNumberHeader ] ;
663
- if ( maybeSeqNum is not null )
664
- {
665
- publishSequenceNumber = BinaryPrimitives . ReadUInt64BigEndian ( ( byte [ ] ) maybeSeqNum ) ;
666
- }
662
+ publishSequenceNumber = BinaryPrimitives . ReadUInt64BigEndian ( ( byte [ ] ) maybeSeqNum ) ;
667
663
}
668
664
669
- if ( publishSequenceNumber != 0 && _publisherConfirmationTrackingEnabled )
670
- {
671
- await HandleAckNack ( publishSequenceNumber , false , true , cancellationToken )
672
- . ConfigureAwait ( false ) ;
673
- }
665
+ await HandleNack ( publishSequenceNumber , false , cancellationToken )
666
+ . ConfigureAwait ( false ) ;
674
667
}
675
668
676
669
return true ;
@@ -1022,7 +1015,6 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1022
1015
1023
1016
if ( _publisherConfirmationTrackingEnabled )
1024
1017
{
1025
- _pendingDeliveryTags . AddLast ( publishSequenceNumber ) ;
1026
1018
publisherConfirmationTcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
1027
1019
_confirmsTaskCompletionSources [ publishSequenceNumber ] = publisherConfirmationTcs ;
1028
1020
}
@@ -1068,9 +1060,9 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1068
1060
try
1069
1061
{
1070
1062
_nextPublishSeqNo -- ;
1071
- if ( _publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null )
1063
+ if ( _publisherConfirmationTrackingEnabled )
1072
1064
{
1073
- _pendingDeliveryTags . RemoveLast ( ) ;
1065
+ _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1074
1066
}
1075
1067
}
1076
1068
finally
@@ -1119,7 +1111,6 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1119
1111
1120
1112
if ( _publisherConfirmationTrackingEnabled )
1121
1113
{
1122
- _pendingDeliveryTags . AddLast ( publishSequenceNumber ) ;
1123
1114
publisherConfirmationTcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
1124
1115
_confirmsTaskCompletionSources [ publishSequenceNumber ] = publisherConfirmationTcs ;
1125
1116
}
@@ -1164,9 +1155,9 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1164
1155
try
1165
1156
{
1166
1157
_nextPublishSeqNo -- ;
1167
- if ( _publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null )
1158
+ if ( _publisherConfirmationTrackingEnabled )
1168
1159
{
1169
- _pendingDeliveryTags . RemoveLast ( ) ;
1160
+ _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1170
1161
}
1171
1162
}
1172
1163
finally
@@ -1189,6 +1180,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1189
1180
{
1190
1181
await publisherConfirmationTcs . Task . WaitAsync ( cancellationToken )
1191
1182
. ConfigureAwait ( false ) ;
1183
+
1192
1184
return await publisherConfirmationTcs . Task
1193
1185
. ConfigureAwait ( false ) ;
1194
1186
}
@@ -1770,7 +1762,6 @@ private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd
1770
1762
{
1771
1763
if ( _publisherConfirmationTrackingEnabled )
1772
1764
{
1773
- _pendingDeliveryTags . Clear ( ) ;
1774
1765
_confirmsTaskCompletionSources . Clear ( ) ;
1775
1766
}
1776
1767
_nextPublishSeqNo = 1 ;
@@ -1796,84 +1787,57 @@ await ModelSendAsync(in method, k.CancellationToken)
1796
1787
}
1797
1788
}
1798
1789
1799
- // TODO NOTE: this method used to be internal for its use in this test:
1800
- // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
1801
- private async Task HandleAckNack ( ulong deliveryTag , bool multiple , bool isNack , CancellationToken cancellationToken = default )
1790
+ private Task HandleAck ( ulong deliveryTag , bool multiple , CancellationToken cancellationToken = default )
1802
1791
{
1803
- bool isAck = false == isNack ;
1804
-
1805
- // Only do this if confirms are enabled *and* the library is tracking confirmations
1806
- if ( _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled )
1792
+ if ( _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && ! _confirmsTaskCompletionSources . IsEmpty )
1807
1793
{
1808
- // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
1809
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1810
- . ConfigureAwait ( false ) ;
1811
- try
1794
+ if ( multiple )
1812
1795
{
1813
- // No need to do anything if there are no delivery tags in the list
1814
- if ( _pendingDeliveryTags . Count > 0 )
1796
+ foreach ( var pair in _confirmsTaskCompletionSources )
1815
1797
{
1816
- if ( multiple )
1817
- {
1818
- do
1819
- {
1820
- if ( _pendingDeliveryTags . First is null )
1821
- {
1822
- break ;
1823
- }
1824
- else
1825
- {
1826
- ulong pendingDeliveryTag = _pendingDeliveryTags . First . Value ;
1827
- if ( pendingDeliveryTag > deliveryTag )
1828
- {
1829
- break ;
1830
- }
1831
- else
1832
- {
1833
- TaskCompletionSource < bool > tcs = _confirmsTaskCompletionSources [ pendingDeliveryTag ] ;
1834
- tcs . SetResult ( isAck ) ;
1835
- _confirmsTaskCompletionSources . Remove ( pendingDeliveryTag ) ;
1836
- _pendingDeliveryTags . RemoveFirst ( ) ;
1837
- }
1838
- }
1839
- }
1840
- while ( true ) ;
1841
- }
1842
- else
1798
+ if ( pair . Key <= deliveryTag )
1843
1799
{
1844
- /*
1845
- * Note:
1846
- * In the case of `basic.return`, the TCS will have been handled and removed by HandleBasicReturn()
1847
- * RabbitMQ still sends `basic.ack`, so the TCS will not be in the dict, hence, TryGetValue here
1848
- */
1849
- if ( _confirmsTaskCompletionSources . TryGetValue ( deliveryTag , out TaskCompletionSource < bool > ? tcs ) )
1850
- {
1851
- tcs . SetResult ( isAck ) ;
1852
- _confirmsTaskCompletionSources . Remove ( deliveryTag ) ;
1853
- _pendingDeliveryTags . Remove ( deliveryTag ) ;
1854
- }
1800
+ pair . Value . SetResult ( true ) ;
1801
+ _confirmsTaskCompletionSources . Remove ( pair . Key , out _ ) ;
1855
1802
}
1856
1803
}
1804
+ }
1805
+ else
1806
+ {
1807
+ if ( _confirmsTaskCompletionSources . TryRemove ( deliveryTag , out TaskCompletionSource < bool > ? tcs ) )
1808
+ {
1809
+ tcs . SetResult ( true ) ;
1810
+ }
1811
+ }
1812
+ }
1813
+ return Task . CompletedTask ;
1814
+ }
1857
1815
1858
- _onlyAcksReceived = _onlyAcksReceived && isAck ;
1859
-
1860
- if ( _pendingDeliveryTags . Count == 0 && _confirmsTaskCompletionSources . Count > 0 )
1816
+ private Task HandleNack ( ulong deliveryTag , bool multiple , CancellationToken cancellationToken = default )
1817
+ {
1818
+ if ( _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && ! _confirmsTaskCompletionSources . IsEmpty )
1819
+ {
1820
+ if ( multiple )
1821
+ {
1822
+ foreach ( var pair in _confirmsTaskCompletionSources )
1861
1823
{
1862
- // Done, mark tasks
1863
- foreach ( TaskCompletionSource < bool > tcs in _confirmsTaskCompletionSources . Values )
1824
+ if ( pair . Key <= deliveryTag )
1864
1825
{
1865
- tcs . TrySetResult ( _onlyAcksReceived ) ;
1826
+ pair . Value . SetException ( new Exception ( "TBD" ) ) ;
1827
+ _confirmsTaskCompletionSources . Remove ( pair . Key , out _ ) ;
1866
1828
}
1867
-
1868
- _confirmsTaskCompletionSources . Clear ( ) ;
1869
- _onlyAcksReceived = true ;
1870
1829
}
1871
1830
}
1872
- finally
1831
+ else
1873
1832
{
1874
- _confirmSemaphore . Release ( ) ;
1833
+ if ( _confirmsTaskCompletionSources . Remove ( deliveryTag , out TaskCompletionSource < bool > ? tcs ) )
1834
+ {
1835
+ tcs . SetException ( new Exception ( "TBD" ) ) ;
1836
+ }
1875
1837
}
1876
1838
}
1839
+
1840
+ return Task . CompletedTask ;
1877
1841
}
1878
1842
1879
1843
private BasicProperties ? PopulateBasicPropertiesHeaders < TProperties > ( TProperties basicProperties ,
@@ -1948,7 +1912,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
1948
1912
publishSequenceNumberBytes = BitConverter . GetBytes ( publishSequenceNumber ) ;
1949
1913
}
1950
1914
1951
- headers [ Constants . PublishSequenceNumberHeader ] = publishSequenceNumberBytes ;
1915
+ var ulongByte = new byte [ 8 ] ;
1916
+ NetworkOrderSerializer . WriteUInt64 ( ref ulongByte . AsSpan ( ) . GetStart ( ) , publishSequenceNumber ) ;
1917
+ headers [ Constants . PublishSequenceNumberHeader ] = ulongByte ;
1952
1918
}
1953
1919
}
1954
1920
}
0 commit comments