Skip to content

Commit 0c35b34

Browse files
committed
* Make ConfirmSelectAsync private and assume that semaphore is held.
1 parent 9280a0f commit 0c35b34

File tree

2 files changed

+57
-75
lines changed

2 files changed

+57
-75
lines changed

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,6 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)
181181
.ConfigureAwait(false);
182182
}
183183

184-
if (_publisherConfirmationsEnabled)
185-
{
186-
await newChannel.ConfirmSelectAsync(_publisherConfirmationTrackingEnabled, cancellationToken)
187-
.ConfigureAwait(false);
188-
}
189-
190184
if (_usesTransactions)
191185
{
192186
await newChannel.TxSelectAsync(cancellationToken)
@@ -350,18 +344,6 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
350344
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
351345
}
352346

353-
public Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default)
354-
{
355-
/*
356-
* Note:
357-
* No need to pass this on to InnerChannel, as confirms will have already
358-
* been enabled
359-
*/
360-
_publisherConfirmationsEnabled = true;
361-
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
362-
return Task.CompletedTask;
363-
}
364-
365347
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
366348
IDictionary<string, object?>? arguments, bool noWait,
367349
CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,21 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
386386
try
387387
{
388388
enqueued = Enqueue(k);
389+
if (enqueued)
390+
{
391+
var method = new ChannelOpen();
392+
await ModelSendAsync(in method, k.CancellationToken)
393+
.ConfigureAwait(false);
389394

390-
var method = new ChannelOpen();
391-
await ModelSendAsync(in method, k.CancellationToken)
392-
.ConfigureAwait(false);
395+
bool result = await k;
396+
Debug.Assert(result);
393397

394-
bool result = await k;
395-
Debug.Assert(result);
398+
if (_publisherConfirmationsEnabled)
399+
{
400+
await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
401+
.ConfigureAwait(false);
402+
}
403+
}
396404
}
397405
finally
398406
{
@@ -403,13 +411,6 @@ await ModelSendAsync(in method, k.CancellationToken)
403411
_rpcSemaphore.Release();
404412
}
405413

406-
if (_publisherConfirmationsEnabled)
407-
{
408-
// TODO bring this back within RPC semaphore
409-
await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
410-
.ConfigureAwait(false);
411-
}
412-
413414
return this;
414415
}
415416

@@ -1244,51 +1245,6 @@ await ModelSendAsync(in method, k.CancellationToken)
12441245
}
12451246
}
12461247

1247-
// TODO internal
1248-
// TODO rpc semaphore held
1249-
public async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false,
1250-
CancellationToken cancellationToken = default)
1251-
{
1252-
_publisherConfirmationsEnabled = true;
1253-
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd;
1254-
1255-
bool enqueued = false;
1256-
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
1257-
1258-
await _rpcSemaphore.WaitAsync(k.CancellationToken)
1259-
.ConfigureAwait(false);
1260-
try
1261-
{
1262-
if (_nextPublishSeqNo == 0UL)
1263-
{
1264-
if (_publisherConfirmationTrackingEnabled)
1265-
{
1266-
_pendingDeliveryTags.Clear();
1267-
_confirmsTaskCompletionSources.Clear();
1268-
}
1269-
_nextPublishSeqNo = 1;
1270-
}
1271-
1272-
enqueued = Enqueue(k);
1273-
1274-
var method = new ConfirmSelect(false);
1275-
await ModelSendAsync(in method, k.CancellationToken)
1276-
.ConfigureAwait(false);
1277-
1278-
bool result = await k;
1279-
Debug.Assert(result);
1280-
return;
1281-
}
1282-
finally
1283-
{
1284-
if (false == enqueued)
1285-
{
1286-
k.Dispose();
1287-
}
1288-
_rpcSemaphore.Release();
1289-
}
1290-
}
1291-
12921248
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
12931249
IDictionary<string, object?>? arguments, bool noWait,
12941250
CancellationToken cancellationToken)
@@ -1773,6 +1729,50 @@ await ModelSendAsync(in method, k.CancellationToken)
17731729
}
17741730
}
17751731

1732+
// NOTE: _rpcSemaphore is held
1733+
private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false,
1734+
CancellationToken cancellationToken = default)
1735+
{
1736+
_publisherConfirmationsEnabled = true;
1737+
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd;
1738+
1739+
bool enqueued = false;
1740+
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
1741+
1742+
try
1743+
{
1744+
if (_nextPublishSeqNo == 0UL)
1745+
{
1746+
if (_publisherConfirmationTrackingEnabled)
1747+
{
1748+
_pendingDeliveryTags.Clear();
1749+
_confirmsTaskCompletionSources.Clear();
1750+
}
1751+
_nextPublishSeqNo = 1;
1752+
}
1753+
1754+
enqueued = Enqueue(k);
1755+
if (enqueued)
1756+
{
1757+
var method = new ConfirmSelect(false);
1758+
await ModelSendAsync(in method, k.CancellationToken)
1759+
.ConfigureAwait(false);
1760+
1761+
bool result = await k;
1762+
Debug.Assert(result);
1763+
}
1764+
1765+
return;
1766+
}
1767+
finally
1768+
{
1769+
if (false == enqueued)
1770+
{
1771+
k.Dispose();
1772+
}
1773+
}
1774+
}
1775+
17761776
// NOTE: this method is internal for its use in this test:
17771777
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
17781778
private async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)