Skip to content

Commit 8b26520

Browse files
committed
* Add cancellation token to binding recovery
1 parent 3fc85ac commit 8b26520

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: tru
179179
.ConfigureAwait(false);
180180
await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken)
181181
.ConfigureAwait(false);
182-
// TODO cancellation token
183-
await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true)
182+
await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken)
184183
.ConfigureAwait(false);
185184

186185
}
@@ -418,7 +417,7 @@ void UpdateConsumerQueue(string oldName, string newName)
418417
}
419418

420419
private async ValueTask RecoverBindingsAsync(IConnection connection,
421-
bool recordedEntitiesSemaphoreHeld = false)
420+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
422421
{
423422
if (_disposed)
424423
{
@@ -434,11 +433,11 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
434433
{
435434
try
436435
{
437-
using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false))
436+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
438437
{
439-
await binding.RecoverAsync(ch)
438+
await binding.RecoverAsync(ch, cancellationToken)
440439
.ConfigureAwait(false);
441-
await ch.CloseAsync()
440+
await ch.CloseAsync(cancellationToken)
442441
.ConfigureAwait(false);
443442
}
444443
}
@@ -450,12 +449,13 @@ await ch.CloseAsync()
450449
try
451450
{
452451
_recordedEntitiesSemaphore.Release();
452+
// TODO maybe cancellation token
453453
await _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync(binding, ex, this)
454454
.ConfigureAwait(false);
455455
}
456456
finally
457457
{
458-
await _recordedEntitiesSemaphore.WaitAsync()
458+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
459459
.ConfigureAwait(false);
460460
}
461461
}

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536

3637
namespace RabbitMQ.Client.Impl
@@ -67,15 +68,17 @@ public RecordedBinding(string destination, in RecordedBinding old)
6768
_arguments = old._arguments;
6869
}
6970

70-
public Task RecoverAsync(IChannel channel)
71+
public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken)
7172
{
7273
if (_isQueueBinding)
7374
{
74-
return channel.QueueBindAsync(_destination, _source, _routingKey, _arguments, false);
75+
return channel.QueueBindAsync(_destination, _source, _routingKey, _arguments, false,
76+
cancellationToken);
7577
}
7678
else
7779
{
78-
return channel.ExchangeBindAsync(_destination, _source, _routingKey, _arguments, false);
80+
return channel.ExchangeBindAsync(_destination, _source, _routingKey, _arguments, false,
81+
cancellationToken);
7982
}
8083
}
8184

0 commit comments

Comments
 (0)