Skip to content

Commit a7f61e0

Browse files
committed
* Remove commented-out code related to WaitForConfirms... methods.
1 parent 3bb4665 commit a7f61e0

20 files changed

+29
-280
lines changed

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -443,34 +443,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
443443
/// <param name="cancellationToken">The cancellation token.</param>
444444
Task TxSelectAsync(CancellationToken cancellationToken = default);
445445

446-
#if REMOVING_WAIT_FOR_CONFIRMS
447-
/// <summary>
448-
/// Asynchronously wait until all published messages on this channel have been confirmed.
449-
/// </summary>
450-
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
451-
/// <param name="cancellationToken">The cancellation token.</param>
452-
/// <remarks>
453-
/// Waits until all messages published on this channel since the last call have
454-
/// been either ack'd or nack'd by the server. Returns whether
455-
/// all the messages were ack'd (and none were nack'd).
456-
/// Throws an exception when called on a channel
457-
/// that does not have publisher confirms enabled.
458-
/// </remarks>
459-
Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default);
460-
461-
/// <summary>
462-
/// Wait until all published messages on this channel have been confirmed.
463-
/// </summary>
464-
/// <param name="cancellationToken">The cancellation token.</param>
465-
/// <remarks>
466-
/// Waits until all messages published on this channel since the last call have
467-
/// been ack'd by the server. If a nack is received or the timeout
468-
/// elapses, throws an IOException exception immediately and closes
469-
/// the channel.
470-
/// </remarks>
471-
Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);
472-
#endif
473-
474446
/// <summary>
475447
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
476448
/// timing out.

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -494,14 +494,6 @@ public Task TxSelectAsync(CancellationToken cancellationToken)
494494
return InnerChannel.TxSelectAsync(cancellationToken);
495495
}
496496

497-
#if REMOVING_WAIT_FOR_CONFIRMS
498-
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
499-
=> InnerChannel.WaitForConfirmsAsync(token);
500-
501-
public Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
502-
=> InnerChannel.WaitForConfirmsOrDieAsync(token);
503-
#endif
504-
505497
[MethodImpl(MethodImplOptions.AggressiveInlining)]
506498
private void ThrowIfDisposed()
507499
{

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 0 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,135 +1775,6 @@ await ModelSendAsync(in method, k.CancellationToken)
17751775
}
17761776
}
17771777

1778-
#if REMOVING_WAIT_FOR_CONFIRMS
1779-
public async Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default)
1780-
{
1781-
if (false == _publisherConfirmationsEnabled)
1782-
{
1783-
throw new InvalidOperationException("Confirms not selected");
1784-
}
1785-
1786-
if (false == _publisherConfirmationTrackingEnabled)
1787-
{
1788-
throw new InvalidOperationException("Confirmation tracking is not enabled");
1789-
}
1790-
1791-
if (_pendingDeliveryTags is null)
1792-
{
1793-
throw new InvalidOperationException(InternalConstants.BugFound);
1794-
}
1795-
1796-
TaskCompletionSource<bool> tcs;
1797-
await _confirmSemaphore.WaitAsync(cancellationToken)
1798-
.ConfigureAwait(false);
1799-
try
1800-
{
1801-
if (_pendingDeliveryTags.Count == 0)
1802-
{
1803-
if (_onlyAcksReceived == false)
1804-
{
1805-
_onlyAcksReceived = true;
1806-
return false;
1807-
}
1808-
1809-
return true;
1810-
}
1811-
1812-
tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
1813-
_confirmsTaskCompletionSources.Add(tcs);
1814-
}
1815-
finally
1816-
{
1817-
_confirmSemaphore.Release();
1818-
}
1819-
1820-
bool rv;
1821-
1822-
if (false == cancellationToken.CanBeCanceled)
1823-
{
1824-
rv = await tcs.Task.ConfigureAwait(false);
1825-
}
1826-
else
1827-
{
1828-
rv = await WaitForConfirmsWithTokenAsync(tcs, cancellationToken)
1829-
.ConfigureAwait(false);
1830-
}
1831-
1832-
return rv;
1833-
}
1834-
1835-
public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
1836-
{
1837-
if (false == _publisherConfirmationsEnabled)
1838-
{
1839-
throw new InvalidOperationException("Confirms not selected");
1840-
}
1841-
1842-
if (false == _publisherConfirmationTrackingEnabled)
1843-
{
1844-
throw new InvalidOperationException("Confirmation tracking is not enabled");
1845-
}
1846-
1847-
if (_pendingDeliveryTags is null)
1848-
{
1849-
throw new InvalidOperationException(InternalConstants.BugFound);
1850-
}
1851-
1852-
try
1853-
{
1854-
bool onlyAcksReceived = await WaitForConfirmsAsync(token)
1855-
.ConfigureAwait(false);
1856-
1857-
if (onlyAcksReceived)
1858-
{
1859-
return;
1860-
}
1861-
1862-
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Nacks Received", new IOException("nack received"));
1863-
1864-
await CloseAsync(ea, false, token)
1865-
.ConfigureAwait(false);
1866-
}
1867-
catch (OperationCanceledException ex)
1868-
{
1869-
const string msg = "timed out waiting for acks";
1870-
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex);
1871-
1872-
await CloseAsync(ea, false, token)
1873-
.ConfigureAwait(false);
1874-
1875-
throw;
1876-
}
1877-
}
1878-
1879-
private static async Task<bool> WaitForConfirmsWithTokenAsync(TaskCompletionSource<bool> tcs,
1880-
CancellationToken cancellationToken)
1881-
{
1882-
CancellationTokenRegistration tokenRegistration =
1883-
#if NET6_0_OR_GREATER
1884-
cancellationToken.UnsafeRegister(
1885-
state => ((TaskCompletionSource<bool>)state!).TrySetCanceled(), tcs);
1886-
#else
1887-
cancellationToken.Register(
1888-
state => ((TaskCompletionSource<bool>)state!).TrySetCanceled(),
1889-
state: tcs, useSynchronizationContext: false);
1890-
#endif
1891-
try
1892-
{
1893-
return await tcs.Task.ConfigureAwait(false);
1894-
}
1895-
finally
1896-
{
1897-
#if NET6_0_OR_GREATER
1898-
await tokenRegistration.DisposeAsync()
1899-
.ConfigureAwait(false);
1900-
#else
1901-
tokenRegistration.Dispose();
1902-
#endif
1903-
}
1904-
}
1905-
#endif
1906-
19071778
// NOTE: this method is internal for its use in this test:
19081779
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
19091780
internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)

projects/Test/Applications/MassPublish/Program.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: Routi
149149
Interlocked.Increment(ref s_messagesSent);
150150
}
151151

152-
// await publishChannel.WaitForConfirmsOrDieAsync();
153-
154152
if (s_debug)
155153
{
156154
Console.WriteLine("[DEBUG] channel {0} done publishing and waiting for confirms", publishChannel.ChannelNumber);

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,6 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
205205
{
206206
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
207207
{
208-
// Note: no need to enable publisher confirmations as they are
209-
// automatically enabled for channels
210-
211208
for (ushort i = 0; i < TotalMessageCount; i++)
212209
{
213210
if (i == CloseAtCount)
@@ -216,7 +213,6 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
216213
}
217214

218215
await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody);
219-
// await publishingChannel.WaitForConfirmsOrDieAsync();
220216
}
221217

222218
await publishingChannel.CloseAsync();
@@ -238,16 +234,6 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
238234
return tcs;
239235
}
240236

241-
#if REMOVING_WAIT_FOR_CONFIRMS
242-
protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel channel)
243-
{
244-
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)))
245-
{
246-
return channel.WaitForConfirmsAsync(cts.Token);
247-
}
248-
}
249-
#endif
250-
251237
protected Task WaitForShutdownAsync()
252238
{
253239
TaskCompletionSource<bool> tcs = PrepareForShutdown(_conn);
@@ -371,8 +357,6 @@ protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, s
371357
await ch.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
372358
body: _encoding.GetBytes("test message"), mandatory: true);
373359

374-
// await ch.WaitForConfirmsOrDieAsync();
375-
376360
try
377361
{
378362
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public async Task TestExchangeToExchangeBindingRecovery()
7171
await CloseAndWaitForRecoveryAsync();
7272
Assert.True(_channel.IsOpen);
7373
await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true);
74-
// await _channel.WaitForConfirmsOrDieAsync();
7574
await AssertMessageCountAsync(q, 1);
7675
}
7776
finally

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,14 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
227227
return Task.CompletedTask;
228228
};
229229

230+
var publishTasks = new List<Task>();
230231
for (int i = 0; i < publish_total; i++)
231232
{
232-
await publishChannel.BasicPublishAsync(string.Empty, queueName, body1);
233-
await publishChannel.BasicPublishAsync(string.Empty, queueName, body2);
234-
// await publishChannel.WaitForConfirmsOrDieAsync();
233+
publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body1).AsTask());
234+
publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body2).AsTask());
235235
}
236236

237+
await Task.WhenAll(publishTasks).WaitAsync(WaitSpan);
237238
await publishChannel.CloseAsync();
238239
}
239240

@@ -488,7 +489,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
488489
{
489490
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
490491
await _channel.BasicPublishAsync(string.Empty, queueName, _body);
491-
// await _channel.WaitForConfirmsOrDieAsync();
492492
}
493493

494494
return true;
@@ -650,7 +650,6 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
650650
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
651651
mandatory: true,
652652
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
653-
// await innerChannel.WaitForConfirmsOrDieAsync();
654653
await innerChannel.CloseAsync();
655654
};
656655
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);
@@ -663,10 +662,7 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
663662
};
664663
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);
665664

666-
// Note: no need to enable publisher confirmations as they are
667-
// automatically enabled for channels
668665
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
669-
// await _channel.WaitForConfirmsOrDieAsync();
670666

671667
Assert.True(await tcs.Task);
672668
}

projects/Test/Integration/TestBasicPublishAsync.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public async Task TestQueuePurgeAsync()
5959
{
6060
await _channel.BasicPublishAsync(string.Empty, q, body);
6161
}
62-
// await _channel.WaitForConfirmsOrDieAsync();
6362
publishSyncSource.SetResult(true);
6463
});
6564

projects/Test/Integration/TestConfirmSelect.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
8080
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
8181
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
8282
mandatory: false, basicProperties: properties, body: body);
83-
// await _channel.WaitForConfirmsOrDieAsync();
8483

8584
try
8685
{
@@ -90,7 +89,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
9089
};
9190
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
9291
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
93-
// await _channel.WaitForConfirmsOrDieAsync();
9492
}
9593
catch
9694
{
@@ -100,7 +98,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
10098
properties = new BasicProperties();
10199
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
102100
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
103-
// await _channel.WaitForConfirmsOrDieAsync();
104101
// _output.WriteLine("I'm done...");
105102
}
106103
}

projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ public async Task TestTopologyRecoveryConsumerFilter()
325325
Assert.True(ch.IsOpen);
326326
await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message"));
327327
await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message"));
328-
// await WaitForConfirmsWithCancellationAsync(ch);
329328

330329
await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
331330
Assert.True(await consumerRecoveryTcs.Task);

projects/Test/Integration/TestConnectionTopologyRecovery.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
314314

315315
var pt1 = ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message"));
316316
var pt2 = ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message"));
317-
// await WaitForConfirmsWithCancellationAsync(ch);
318317
await Task.WhenAll(pt1.AsTask(), pt2.AsTask()).WaitAsync(WaitSpan);
319318

320319
await Task.WhenAll(consumerReceivedTcs1.Task, consumerReceivedTcs2.Task).WaitAsync(WaitSpan);

projects/Test/Integration/TestExtensions.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,10 @@ public async Task TestExchangeBinding()
6464
await _channel.QueueBindAsync(queue, "dest", string.Empty);
6565

6666
await _channel.BasicPublishAsync("src", string.Empty, Array.Empty<byte>());
67-
// await _channel.WaitForConfirmsAsync();
6867
Assert.NotNull(await _channel.BasicGetAsync(queue, true));
6968

7069
await _channel.ExchangeUnbindAsync("dest", "src", string.Empty);
7170
await _channel.BasicPublishAsync("src", string.Empty, Array.Empty<byte>());
72-
// await _channel.WaitForConfirmsAsync();
7371

7472
Assert.Null(await _channel.BasicGetAsync(queue, true));
7573

0 commit comments

Comments
 (0)