Skip to content

Commit 45b7dfa

Browse files
committed
Extend the use of _confirmSemaphore to the duration of when exceptions could be caught.
1 parent 792096d commit 45b7dfa

File tree

1 file changed

+52
-75
lines changed

1 file changed

+52
-75
lines changed

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 52 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,12 +1005,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, str
10051005
{
10061006
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
10071007
ulong publishSequenceNumber = 0;
1008-
if (_publisherConfirmationsEnabled)
1008+
try
10091009
{
1010-
await _confirmSemaphore.WaitAsync(cancellationToken)
1011-
.ConfigureAwait(false);
1012-
try
1010+
if (_publisherConfirmationsEnabled)
10131011
{
1012+
await _confirmSemaphore.WaitAsync(cancellationToken)
1013+
.ConfigureAwait(false);
1014+
10141015
publishSequenceNumber = _nextPublishSeqNo;
10151016

10161017
if (_publisherConfirmationTrackingEnabled)
@@ -1021,14 +1022,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10211022

10221023
_nextPublishSeqNo++;
10231024
}
1024-
finally
1025-
{
1026-
_confirmSemaphore.Release();
1027-
}
1028-
}
10291025

1030-
try
1031-
{
10321026
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
10331027

10341028
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
@@ -1055,19 +1049,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
10551049
{
10561050
if (_publisherConfirmationsEnabled)
10571051
{
1058-
await _confirmSemaphore.WaitAsync(cancellationToken)
1059-
.ConfigureAwait(false);
1060-
try
1061-
{
1062-
_nextPublishSeqNo--;
1063-
if (_publisherConfirmationTrackingEnabled)
1064-
{
1065-
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
1066-
}
1067-
}
1068-
finally
1052+
_nextPublishSeqNo--;
1053+
if (_publisherConfirmationTrackingEnabled)
10691054
{
1070-
_confirmSemaphore.Release();
1055+
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
10711056
}
10721057
}
10731058

@@ -1080,6 +1065,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10801065
throw;
10811066
}
10821067
}
1068+
finally
1069+
{
1070+
if (_publisherConfirmationsEnabled)
1071+
{
1072+
_confirmSemaphore.Release();
1073+
}
1074+
}
10831075

10841076
if (publisherConfirmationTcs is not null)
10851077
{
@@ -1101,12 +1093,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchang
11011093
{
11021094
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
11031095
ulong publishSequenceNumber = 0;
1104-
if (_publisherConfirmationsEnabled)
1096+
try
11051097
{
1106-
await _confirmSemaphore.WaitAsync(cancellationToken)
1107-
.ConfigureAwait(false);
1108-
try
1098+
if (_publisherConfirmationsEnabled)
11091099
{
1100+
await _confirmSemaphore.WaitAsync(cancellationToken)
1101+
.ConfigureAwait(false);
1102+
11101103
publishSequenceNumber = _nextPublishSeqNo;
11111104

11121105
if (_publisherConfirmationTrackingEnabled)
@@ -1117,14 +1110,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
11171110

11181111
_nextPublishSeqNo++;
11191112
}
1120-
finally
1121-
{
1122-
_confirmSemaphore.Release();
1123-
}
1124-
}
11251113

1126-
try
1127-
{
11281114
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
11291115
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
11301116
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
@@ -1150,19 +1136,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
11501136
{
11511137
if (_publisherConfirmationsEnabled)
11521138
{
1153-
await _confirmSemaphore.WaitAsync(cancellationToken)
1154-
.ConfigureAwait(false);
1155-
try
1156-
{
1157-
_nextPublishSeqNo--;
1158-
if (_publisherConfirmationTrackingEnabled)
1159-
{
1160-
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
1161-
}
1162-
}
1163-
finally
1139+
_nextPublishSeqNo--;
1140+
if (_publisherConfirmationTrackingEnabled)
11641141
{
1165-
_confirmSemaphore.Release();
1142+
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
11661143
}
11671144
}
11681145

@@ -1175,6 +1152,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
11751152
throw;
11761153
}
11771154
}
1155+
finally
1156+
{
1157+
if (_publisherConfirmationsEnabled)
1158+
{
1159+
_confirmSemaphore.Release();
1160+
}
1161+
}
11781162

11791163
if (publisherConfirmationTcs is not null)
11801164
{
@@ -1793,7 +1777,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
17931777
{
17941778
if (multiple)
17951779
{
1796-
foreach (var pair in _confirmsTaskCompletionSources)
1780+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
17971781
{
17981782
if (pair.Key <= deliveryTag)
17991783
{
@@ -1810,6 +1794,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
18101794
}
18111795
}
18121796
}
1797+
18131798
return Task.CompletedTask;
18141799
}
18151800

@@ -1819,11 +1804,11 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
18191804
{
18201805
if (multiple)
18211806
{
1822-
foreach (var pair in _confirmsTaskCompletionSources)
1807+
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
18231808
{
18241809
if (pair.Key <= deliveryTag)
18251810
{
1826-
pair.Value.SetException(new Exception("TBD"));
1811+
pair.Value.SetException(new Exception("TODO"));
18271812
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
18281813
}
18291814
}
@@ -1832,7 +1817,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
18321817
{
18331818
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
18341819
{
1835-
tcs.SetException(new Exception("TBD"));
1820+
tcs.SetException(new Exception("TODO"));
18361821
}
18371822
}
18381823
}
@@ -1844,39 +1829,31 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
18441829
Activity? sendActivity, ulong publishSequenceNumber)
18451830
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
18461831
{
1847-
if (sendActivity is null && false == _publisherConfirmationsEnabled)
1832+
/*
1833+
* Note: there is nothing to do in this method if *both* of these
1834+
* conditions are true:
1835+
*
1836+
* sendActivity is null - there is no activity to add as a header
1837+
* publisher confirmations are NOT enabled
1838+
*/
1839+
if (sendActivity is null && !_publisherConfirmationsEnabled)
18481840
{
18491841
return null;
18501842
}
18511843

1852-
var newHeaders = new Dictionary<string, object?>();
1853-
MaybeAddActivityToHeaders(newHeaders, basicProperties.CorrelationId, sendActivity);
1854-
MaybeAddPublishSequenceNumberToHeaders(newHeaders);
1844+
IDictionary<string, object?>? headers = basicProperties.Headers;
1845+
headers ??= new Dictionary<string, object?>();
1846+
MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
1847+
MaybeAddPublishSequenceNumberToHeaders(headers);
18551848

18561849
switch (basicProperties)
18571850
{
18581851
case BasicProperties writableProperties:
1859-
MergeHeaders(newHeaders, writableProperties);
18601852
return null;
18611853
case EmptyBasicProperty:
1862-
return new BasicProperties { Headers = newHeaders };
1854+
return new BasicProperties { Headers = headers };
18631855
default:
1864-
return new BasicProperties(basicProperties) { Headers = newHeaders };
1865-
}
1866-
1867-
static void MergeHeaders(IDictionary<string, object?> newHeaders, BasicProperties props)
1868-
{
1869-
if (props.Headers is null)
1870-
{
1871-
props.Headers = newHeaders;
1872-
}
1873-
else
1874-
{
1875-
foreach (KeyValuePair<string, object?> val in newHeaders)
1876-
{
1877-
props.Headers[val.Key] = val.Value;
1878-
}
1879-
}
1856+
return new BasicProperties(basicProperties) { Headers = headers };
18801857
}
18811858

18821859
void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
@@ -1902,9 +1879,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
19021879
{
19031880
if (_publisherConfirmationsEnabled)
19041881
{
1905-
var publishSequenceNumberBytes = new byte[8];
1906-
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.AsSpan().GetStart(), publishSequenceNumber);
1907-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
1882+
Span<byte> publishSequenceNumberBytes = stackalloc byte[8];
1883+
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
1884+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes.ToArray();
19081885
}
19091886
}
19101887
}

0 commit comments

Comments
 (0)