Skip to content

Commit 49e5d3c

Browse files
committed
More CancellationToken todos
* Add cancellation token to methods in `IChannelExtensions` * Add cancellation token to Exchange recovery
1 parent a9f331e commit 49e5d3c

File tree

4 files changed

+26
-18
lines changed

4 files changed

+26
-18
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -937,13 +937,13 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
937937
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.set -> void
938938
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
939939
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
940-
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
940+
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
941941
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
942942
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
943943
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
944944
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
945-
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
946-
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> System.Threading.Tasks.Task
945+
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
946+
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
947947
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
948948
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
949949
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.client.impl;
3637

@@ -145,12 +146,13 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string
145146
/// method does nothing but wait for the in-progress close
146147
/// operation to complete. This method will not return to the
147148
/// caller until the shutdown is complete.
148-
/// In comparison to normal <see cref="CloseAsync(IChannel)"/> method, <see cref="AbortAsync(IChannel)"/> will not throw
149+
/// In comparison to normal <see cref="CloseAsync(IChannel, CancellationToken)"/> method, <see cref="AbortAsync(IChannel, CancellationToken)"/> will not throw
149150
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing channel.
150151
/// </remarks>
151-
public static Task AbortAsync(this IChannel channel)
152+
public static Task AbortAsync(this IChannel channel, CancellationToken cancellationToken = default)
152153
{
153-
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true);
154+
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true,
155+
cancellationToken);
154156
}
155157

156158
/// <summary>Asynchronously close this session.</summary>
@@ -160,9 +162,10 @@ public static Task AbortAsync(this IChannel channel)
160162
/// operation to complete. This method will not return to the
161163
/// caller until the shutdown is complete.
162164
/// </remarks>
163-
public static Task CloseAsync(this IChannel channel)
165+
public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default)
164166
{
165-
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false);
167+
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false,
168+
cancellationToken);
166169
}
167170

168171
/// <summary>
@@ -171,6 +174,7 @@ public static Task CloseAsync(this IChannel channel)
171174
/// <param name="channel">The channel.</param>
172175
/// <param name="replyCode">The reply code.</param>
173176
/// <param name="replyText">The reply text.</param>
177+
/// <param name="cancellationToken">The cancellation token.</param>
174178
/// <remarks>
175179
/// The method behaves in the same way as Close(), with the only
176180
/// difference that the channel is closed with the given channel
@@ -181,9 +185,10 @@ public static Task CloseAsync(this IChannel channel)
181185
/// A message indicating the reason for closing the channel
182186
/// </para>
183187
/// </remarks>
184-
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText)
188+
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText,
189+
CancellationToken cancellationToken = default)
185190
{
186-
return channel.CloseAsync(replyCode, replyText, false);
191+
return channel.CloseAsync(replyCode, replyText, false, cancellationToken);
187192
}
188193
}
189194
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
177177
// 3. Recover bindings
178178
// 4. Recover consumers
179179
// TODO cancellation token
180-
await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true)
180+
await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken)
181181
.ConfigureAwait(false);
182182
// TODO cancellation token
183183
await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true)
@@ -258,7 +258,7 @@ await _innerConnection.OpenAsync(cancellationToken)
258258
}
259259

260260
private async ValueTask RecoverExchangesAsync(IConnection connection,
261-
bool recordedEntitiesSemaphoreHeld = false)
261+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
262262
{
263263
if (_disposed)
264264
{
@@ -274,11 +274,11 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
274274
{
275275
try
276276
{
277-
using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false))
277+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
278278
{
279-
await recordedExchange.RecoverAsync(ch)
279+
await recordedExchange.RecoverAsync(ch, cancellationToken)
280280
.ConfigureAwait(false);
281-
await ch.CloseAsync()
281+
await ch.CloseAsync(cancellationToken)
282282
.ConfigureAwait(false);
283283
}
284284
}
@@ -290,12 +290,13 @@ await ch.CloseAsync()
290290
try
291291
{
292292
_recordedEntitiesSemaphore.Release();
293+
// TODO maybe cancellation token
293294
await _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync(recordedExchange, ex, this)
294295
.ConfigureAwait(false);
295296
}
296297
finally
297298
{
298-
await _recordedEntitiesSemaphore.WaitAsync()
299+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
299300
.ConfigureAwait(false);
300301
}
301302
}

projects/RabbitMQ.Client/client/impl/RecordedExchange.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System.Collections.Generic;
33+
using System.Threading;
3334
using System.Threading.Tasks;
3435

3536
namespace RabbitMQ.Client.Impl
@@ -58,10 +59,11 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete,
5859
_arguments = arguments;
5960
}
6061

61-
public Task RecoverAsync(IChannel channel)
62+
public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken)
6263
{
6364
return channel.ExchangeDeclareAsync(exchange: Name, type: _type, passive: false,
64-
durable: _durable, autoDelete: AutoDelete, noWait: false, arguments: _arguments);
65+
durable: _durable, autoDelete: AutoDelete, noWait: false, arguments: _arguments,
66+
cancellationToken: cancellationToken);
6567
}
6668

6769
public override string ToString()

0 commit comments

Comments
 (0)