@@ -1005,12 +1005,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, str
1005
1005
{
1006
1006
TaskCompletionSource < bool > ? publisherConfirmationTcs = null ;
1007
1007
ulong publishSequenceNumber = 0 ;
1008
- if ( _publisherConfirmationsEnabled )
1008
+ try
1009
1009
{
1010
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1011
- . ConfigureAwait ( false ) ;
1012
- try
1010
+ if ( _publisherConfirmationsEnabled )
1013
1011
{
1012
+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1013
+ . ConfigureAwait ( false ) ;
1014
+
1014
1015
publishSequenceNumber = _nextPublishSeqNo ;
1015
1016
1016
1017
if ( _publisherConfirmationTrackingEnabled )
@@ -1021,14 +1022,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1021
1022
1022
1023
_nextPublishSeqNo ++ ;
1023
1024
}
1024
- finally
1025
- {
1026
- _confirmSemaphore . Release ( ) ;
1027
- }
1028
- }
1029
1025
1030
- try
1031
- {
1032
1026
var cmd = new BasicPublish ( exchange , routingKey , mandatory , default ) ;
1033
1027
1034
1028
using Activity ? sendActivity = RabbitMQActivitySource . PublisherHasListeners
@@ -1055,19 +1049,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
1055
1049
{
1056
1050
if ( _publisherConfirmationsEnabled )
1057
1051
{
1058
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1059
- . ConfigureAwait ( false ) ;
1060
- try
1061
- {
1062
- _nextPublishSeqNo -- ;
1063
- if ( _publisherConfirmationTrackingEnabled )
1064
- {
1065
- _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1066
- }
1067
- }
1068
- finally
1052
+ _nextPublishSeqNo -- ;
1053
+ if ( _publisherConfirmationTrackingEnabled )
1069
1054
{
1070
- _confirmSemaphore . Release ( ) ;
1055
+ _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1071
1056
}
1072
1057
}
1073
1058
@@ -1080,6 +1065,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1080
1065
throw ;
1081
1066
}
1082
1067
}
1068
+ finally
1069
+ {
1070
+ if ( _publisherConfirmationsEnabled )
1071
+ {
1072
+ _confirmSemaphore . Release ( ) ;
1073
+ }
1074
+ }
1083
1075
1084
1076
if ( publisherConfirmationTcs is not null )
1085
1077
{
@@ -1101,12 +1093,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchang
1101
1093
{
1102
1094
TaskCompletionSource < bool > ? publisherConfirmationTcs = null ;
1103
1095
ulong publishSequenceNumber = 0 ;
1104
- if ( _publisherConfirmationsEnabled )
1096
+ try
1105
1097
{
1106
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1107
- . ConfigureAwait ( false ) ;
1108
- try
1098
+ if ( _publisherConfirmationsEnabled )
1109
1099
{
1100
+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1101
+ . ConfigureAwait ( false ) ;
1102
+
1110
1103
publishSequenceNumber = _nextPublishSeqNo ;
1111
1104
1112
1105
if ( _publisherConfirmationTrackingEnabled )
@@ -1117,14 +1110,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1117
1110
1118
1111
_nextPublishSeqNo ++ ;
1119
1112
}
1120
- finally
1121
- {
1122
- _confirmSemaphore . Release ( ) ;
1123
- }
1124
- }
1125
1113
1126
- try
1127
- {
1128
1114
var cmd = new BasicPublishMemory ( exchange . Bytes , routingKey . Bytes , mandatory , default ) ;
1129
1115
using Activity ? sendActivity = RabbitMQActivitySource . PublisherHasListeners
1130
1116
? RabbitMQActivitySource . Send ( routingKey . Value , exchange . Value , body . Length )
@@ -1150,19 +1136,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
1150
1136
{
1151
1137
if ( _publisherConfirmationsEnabled )
1152
1138
{
1153
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1154
- . ConfigureAwait ( false ) ;
1155
- try
1156
- {
1157
- _nextPublishSeqNo -- ;
1158
- if ( _publisherConfirmationTrackingEnabled )
1159
- {
1160
- _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1161
- }
1162
- }
1163
- finally
1139
+ _nextPublishSeqNo -- ;
1140
+ if ( _publisherConfirmationTrackingEnabled )
1164
1141
{
1165
- _confirmSemaphore . Release ( ) ;
1142
+ _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1166
1143
}
1167
1144
}
1168
1145
@@ -1175,6 +1152,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
1175
1152
throw ;
1176
1153
}
1177
1154
}
1155
+ finally
1156
+ {
1157
+ if ( _publisherConfirmationsEnabled )
1158
+ {
1159
+ _confirmSemaphore . Release ( ) ;
1160
+ }
1161
+ }
1178
1162
1179
1163
if ( publisherConfirmationTcs is not null )
1180
1164
{
@@ -1793,7 +1777,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
1793
1777
{
1794
1778
if ( multiple )
1795
1779
{
1796
- foreach ( var pair in _confirmsTaskCompletionSources )
1780
+ foreach ( KeyValuePair < ulong , TaskCompletionSource < bool > > pair in _confirmsTaskCompletionSources )
1797
1781
{
1798
1782
if ( pair . Key <= deliveryTag )
1799
1783
{
@@ -1810,6 +1794,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
1810
1794
}
1811
1795
}
1812
1796
}
1797
+
1813
1798
return Task . CompletedTask ;
1814
1799
}
1815
1800
@@ -1819,11 +1804,11 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
1819
1804
{
1820
1805
if ( multiple )
1821
1806
{
1822
- foreach ( var pair in _confirmsTaskCompletionSources )
1807
+ foreach ( KeyValuePair < ulong , TaskCompletionSource < bool > > pair in _confirmsTaskCompletionSources )
1823
1808
{
1824
1809
if ( pair . Key <= deliveryTag )
1825
1810
{
1826
- pair . Value . SetException ( new Exception ( "TBD " ) ) ;
1811
+ pair . Value . SetException ( new Exception ( "TODO " ) ) ;
1827
1812
_confirmsTaskCompletionSources . Remove ( pair . Key , out _ ) ;
1828
1813
}
1829
1814
}
@@ -1832,7 +1817,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
1832
1817
{
1833
1818
if ( _confirmsTaskCompletionSources . Remove ( deliveryTag , out TaskCompletionSource < bool > ? tcs ) )
1834
1819
{
1835
- tcs . SetException ( new Exception ( "TBD " ) ) ;
1820
+ tcs . SetException ( new Exception ( "TODO " ) ) ;
1836
1821
}
1837
1822
}
1838
1823
}
@@ -1844,39 +1829,31 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
1844
1829
Activity ? sendActivity , ulong publishSequenceNumber )
1845
1830
where TProperties : IReadOnlyBasicProperties , IAmqpHeader
1846
1831
{
1847
- if ( sendActivity is null && false == _publisherConfirmationsEnabled )
1832
+ /*
1833
+ * Note: there is nothing to do in this method if *both* of these
1834
+ * conditions are true:
1835
+ *
1836
+ * sendActivity is null - there is no activity to add as a header
1837
+ * publisher confirmations are NOT enabled
1838
+ */
1839
+ if ( sendActivity is null && ! _publisherConfirmationsEnabled )
1848
1840
{
1849
1841
return null ;
1850
1842
}
1851
1843
1852
- var newHeaders = new Dictionary < string , object ? > ( ) ;
1853
- MaybeAddActivityToHeaders ( newHeaders , basicProperties . CorrelationId , sendActivity ) ;
1854
- MaybeAddPublishSequenceNumberToHeaders ( newHeaders ) ;
1844
+ IDictionary < string , object ? > ? headers = basicProperties . Headers ;
1845
+ headers ??= new Dictionary < string , object ? > ( ) ;
1846
+ MaybeAddActivityToHeaders ( headers , basicProperties . CorrelationId , sendActivity ) ;
1847
+ MaybeAddPublishSequenceNumberToHeaders ( headers ) ;
1855
1848
1856
1849
switch ( basicProperties )
1857
1850
{
1858
1851
case BasicProperties writableProperties :
1859
- MergeHeaders ( newHeaders , writableProperties ) ;
1860
1852
return null ;
1861
1853
case EmptyBasicProperty :
1862
- return new BasicProperties { Headers = newHeaders } ;
1854
+ return new BasicProperties { Headers = headers } ;
1863
1855
default :
1864
- return new BasicProperties ( basicProperties ) { Headers = newHeaders } ;
1865
- }
1866
-
1867
- static void MergeHeaders ( IDictionary < string , object ? > newHeaders , BasicProperties props )
1868
- {
1869
- if ( props . Headers is null )
1870
- {
1871
- props . Headers = newHeaders ;
1872
- }
1873
- else
1874
- {
1875
- foreach ( KeyValuePair < string , object ? > val in newHeaders )
1876
- {
1877
- props . Headers [ val . Key ] = val . Value ;
1878
- }
1879
- }
1856
+ return new BasicProperties ( basicProperties ) { Headers = headers } ;
1880
1857
}
1881
1858
1882
1859
void MaybeAddActivityToHeaders ( IDictionary < string , object ? > headers ,
@@ -1902,9 +1879,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
1902
1879
{
1903
1880
if ( _publisherConfirmationsEnabled )
1904
1881
{
1905
- var publishSequenceNumberBytes = new byte [ 8 ] ;
1906
- NetworkOrderSerializer . WriteUInt64 ( ref publishSequenceNumberBytes . AsSpan ( ) . GetStart ( ) , publishSequenceNumber ) ;
1907
- headers [ Constants . PublishSequenceNumberHeader ] = publishSequenceNumberBytes ;
1882
+ Span < byte > publishSequenceNumberBytes = stackalloc byte [ 8 ] ;
1883
+ NetworkOrderSerializer . WriteUInt64 ( ref publishSequenceNumberBytes . GetStart ( ) , publishSequenceNumber ) ;
1884
+ headers [ Constants . PublishSequenceNumberHeader ] = publishSequenceNumberBytes . ToArray ( ) ;
1908
1885
}
1909
1886
}
1910
1887
}
0 commit comments