@@ -1004,26 +1004,13 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
1004
1004
CancellationToken cancellationToken = default )
1005
1005
where TProperties : IReadOnlyBasicProperties , IAmqpHeader
1006
1006
{
1007
- TaskCompletionSource < bool > ? publisherConfirmationTcs = null ;
1008
- ulong publishSequenceNumber = 0 ;
1007
+ PublisherConfirmationInfo ? publisherConfirmationInfo = null ;
1009
1008
try
1010
1009
{
1011
- if ( _publisherConfirmationsEnabled )
1012
- {
1013
- await _confirmSemaphore . WaitAsync ( cancellationToken )
1010
+ publisherConfirmationInfo =
1011
+ await MaybeStartPublisherConfirmationTracking ( cancellationToken )
1014
1012
. ConfigureAwait ( false ) ;
1015
1013
1016
- publishSequenceNumber = _nextPublishSeqNo ;
1017
-
1018
- if ( _publisherConfirmationTrackingEnabled )
1019
- {
1020
- publisherConfirmationTcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
1021
- _confirmsTaskCompletionSources [ publishSequenceNumber ] = publisherConfirmationTcs ;
1022
- }
1023
-
1024
- _nextPublishSeqNo ++ ;
1025
- }
1026
-
1027
1014
await EnforceFlowControlAsync ( cancellationToken )
1028
1015
. ConfigureAwait ( false ) ;
1029
1016
@@ -1033,6 +1020,12 @@ await EnforceFlowControlAsync(cancellationToken)
1033
1020
? RabbitMQActivitySource . Send ( routingKey . Value , exchange . Value , body . Length )
1034
1021
: default ;
1035
1022
1023
+ ulong publishSequenceNumber = 0 ;
1024
+ if ( publisherConfirmationInfo is not null )
1025
+ {
1026
+ publishSequenceNumber = publisherConfirmationInfo . PublishSequenceNumber ;
1027
+ }
1028
+
1036
1029
BasicProperties ? props = PopulateBasicPropertiesHeaders ( basicProperties , sendActivity , publishSequenceNumber ) ;
1037
1030
if ( props is null )
1038
1031
{
@@ -1047,35 +1040,16 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
1047
1040
}
1048
1041
catch ( Exception ex )
1049
1042
{
1050
- if ( _publisherConfirmationsEnabled )
1051
- {
1052
- _nextPublishSeqNo -- ;
1053
- if ( _publisherConfirmationTrackingEnabled )
1054
- {
1055
- _confirmsTaskCompletionSources . TryRemove ( publishSequenceNumber , out _ ) ;
1056
- }
1057
- }
1058
-
1059
- if ( publisherConfirmationTcs is not null )
1060
- {
1061
- publisherConfirmationTcs . SetException ( ex ) ;
1062
- }
1063
- else
1043
+ bool exceptionWasHandled =
1044
+ MaybeHandleExceptionWithEnabledPublisherConfirmations ( publisherConfirmationInfo , ex ) ;
1045
+ if ( ! exceptionWasHandled )
1064
1046
{
1065
1047
throw ;
1066
1048
}
1067
1049
}
1068
1050
finally
1069
1051
{
1070
- if ( _publisherConfirmationsEnabled )
1071
- {
1072
- _confirmSemaphore . Release ( ) ;
1073
- }
1074
- }
1075
-
1076
- if ( publisherConfirmationTcs is not null )
1077
- {
1078
- await publisherConfirmationTcs . Task . WaitAsync ( cancellationToken )
1052
+ await MaybeEndPublisherConfirmationTracking ( publisherConfirmationInfo , cancellationToken )
1079
1053
. ConfigureAwait ( false ) ;
1080
1054
}
1081
1055
}
0 commit comments