Skip to content

Commit 3fc85ac

Browse files
committed
* Add cancellation token to Queue recovery
1 parent 49e5d3c commit 3fc85ac

File tree

4 files changed

+40
-40
lines changed

4 files changed

+40
-40
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWai
351351
{
352352
await InnerChannel.ExchangeDeleteAsync(exchange, ifUnused, noWait, cancellationToken)
353353
.ConfigureAwait(false);
354-
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
354+
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
355355
.ConfigureAwait(false);
356356
}
357357

@@ -361,11 +361,11 @@ public async Task ExchangeUnbindAsync(string destination, string source, string
361361
{
362362
ThrowIfDisposed();
363363
var recordedBinding = new RecordedBinding(false, destination, source, routingKey, arguments);
364-
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
364+
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
365365
.ConfigureAwait(false);
366366
await InnerChannel.ExchangeUnbindAsync(destination, source, routingKey, arguments, noWait, cancellationToken)
367367
.ConfigureAwait(false);
368-
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false)
368+
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false, cancellationToken)
369369
.ConfigureAwait(false);
370370
}
371371

@@ -396,7 +396,7 @@ public async Task<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable,
396396
if (false == passive)
397397
{
398398
var recordedQueue = new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments);
399-
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false)
399+
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
400400
.ConfigureAwait(false);
401401
}
402402
return result;
@@ -415,7 +415,7 @@ public async Task<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmp
415415
{
416416
uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait, cancellationToken)
417417
.ConfigureAwait(false);
418-
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false)
418+
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
419419
.ConfigureAwait(false);
420420
return result;
421421
}
@@ -429,11 +429,11 @@ public async Task QueueUnbindAsync(string queue, string exchange, string routing
429429
{
430430
ThrowIfDisposed();
431431
var recordedBinding = new RecordedBinding(true, queue, exchange, routingKey, arguments);
432-
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
432+
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
433433
.ConfigureAwait(false);
434434
await _innerChannel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken)
435435
.ConfigureAwait(false);
436-
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
436+
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
437437
.ConfigureAwait(false);
438438
}
439439

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private void DoRecordExchange(in RecordedExchange exchange)
8383
}
8484

8585
internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,
86-
bool recordedEntitiesSemaphoreHeld)
86+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
8787
{
8888
if (_disposed)
8989
{
@@ -92,16 +92,16 @@ internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,
9292

9393
if (recordedEntitiesSemaphoreHeld)
9494
{
95-
await DoDeleteRecordedExchangeAsync(exchangeName)
95+
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
9696
.ConfigureAwait(false);
9797
}
9898
else
9999
{
100-
await _recordedEntitiesSemaphore.WaitAsync()
100+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
101101
.ConfigureAwait(false);
102102
try
103103
{
104-
await DoDeleteRecordedExchangeAsync(exchangeName)
104+
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
105105
.ConfigureAwait(false);
106106
}
107107
finally
@@ -110,7 +110,7 @@ await DoDeleteRecordedExchangeAsync(exchangeName)
110110
}
111111
}
112112

113-
async Task DoDeleteRecordedExchangeAsync(string exchangeName)
113+
async Task DoDeleteRecordedExchangeAsync(string exchangeName, CancellationToken cancellationToken)
114114
{
115115
_recordedExchanges.Remove(exchangeName);
116116

@@ -120,18 +120,18 @@ async Task DoDeleteRecordedExchangeAsync(string exchangeName)
120120
if (binding.Destination == exchangeName)
121121
{
122122
await DeleteRecordedBindingAsync(binding,
123-
recordedEntitiesSemaphoreHeld: true)
123+
recordedEntitiesSemaphoreHeld: true, cancellationToken)
124124
.ConfigureAwait(false);
125125
await DeleteAutoDeleteExchangeAsync(binding.Source,
126-
recordedEntitiesSemaphoreHeld: true)
126+
recordedEntitiesSemaphoreHeld: true, cancellationToken)
127127
.ConfigureAwait(false);
128128
}
129129
}
130130
}
131131
}
132132

133133
internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
134-
bool recordedEntitiesSemaphoreHeld)
134+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
135135
{
136136
if (_disposed)
137137
{
@@ -144,7 +144,7 @@ internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
144144
}
145145
else
146146
{
147-
await _recordedEntitiesSemaphore.WaitAsync()
147+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
148148
.ConfigureAwait(false);
149149
try
150150
{
@@ -185,7 +185,7 @@ bool AnyBindingsOnExchange(string exchange)
185185
internal int RecordedQueuesCount => _recordedQueues.Count;
186186

187187
internal async ValueTask RecordQueueAsync(RecordedQueue queue,
188-
bool recordedEntitiesSemaphoreHeld)
188+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
189189
{
190190
if (_disposed)
191191
{
@@ -198,7 +198,7 @@ internal async ValueTask RecordQueueAsync(RecordedQueue queue,
198198
}
199199
else
200200
{
201-
await _recordedEntitiesSemaphore.WaitAsync()
201+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
202202
.ConfigureAwait(false);
203203
try
204204
{
@@ -217,7 +217,7 @@ private void DoRecordQueue(RecordedQueue queue)
217217
}
218218

219219
internal async ValueTask DeleteRecordedQueueAsync(string queueName,
220-
bool recordedEntitiesSemaphoreHeld)
220+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
221221
{
222222
if (_disposed)
223223
{
@@ -226,16 +226,16 @@ internal async ValueTask DeleteRecordedQueueAsync(string queueName,
226226

227227
if (recordedEntitiesSemaphoreHeld)
228228
{
229-
await DoDeleteRecordedQueueAsync(queueName)
229+
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
230230
.ConfigureAwait(false);
231231
}
232232
else
233233
{
234-
await _recordedEntitiesSemaphore.WaitAsync()
234+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
235235
.ConfigureAwait(false);
236236
try
237237
{
238-
await DoDeleteRecordedQueueAsync(queueName)
238+
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
239239
.ConfigureAwait(false);
240240
}
241241
finally
@@ -244,7 +244,7 @@ await DoDeleteRecordedQueueAsync(queueName)
244244
}
245245
}
246246

247-
async ValueTask DoDeleteRecordedQueueAsync(string queueName)
247+
async ValueTask DoDeleteRecordedQueueAsync(string queueName, CancellationToken cancellationToken)
248248
{
249249
_recordedQueues.Remove(queueName);
250250

@@ -254,10 +254,10 @@ async ValueTask DoDeleteRecordedQueueAsync(string queueName)
254254
if (binding.Destination == queueName)
255255
{
256256
await DeleteRecordedBindingAsync(binding,
257-
recordedEntitiesSemaphoreHeld: true)
257+
recordedEntitiesSemaphoreHeld: true, cancellationToken)
258258
.ConfigureAwait(false);
259259
await DeleteAutoDeleteExchangeAsync(binding.Source,
260-
recordedEntitiesSemaphoreHeld: true)
260+
recordedEntitiesSemaphoreHeld: true, cancellationToken)
261261
.ConfigureAwait(false);
262262
}
263263
}
@@ -298,7 +298,7 @@ private void DoRecordBinding(in RecordedBinding binding)
298298
}
299299

300300
internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
301-
bool recordedEntitiesSemaphoreHeld)
301+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
302302
{
303303
if (_disposed)
304304
{
@@ -311,7 +311,7 @@ internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
311311
}
312312
else
313313
{
314-
await _recordedEntitiesSemaphore.WaitAsync()
314+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
315315
.ConfigureAwait(false);
316316
try
317317
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ private static void HandleTopologyRecoveryException(TopologyRecoveryException e)
153153
ESLog.Info($"Will not retry recovery because of {e.InnerException?.GetType().FullName}: it's not a known problem with connectivity, ignoring it", e);
154154
}
155155

156-
// TODO cancellation token
157156
private async ValueTask<bool> TryPerformAutomaticRecoveryAsync(CancellationToken cancellationToken)
158157
{
159158
ESLog.Info("Performing automatic recovery");
@@ -176,11 +175,9 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
176175
// 2. Recover queues
177176
// 3. Recover bindings
178177
// 4. Recover consumers
179-
// TODO cancellation token
180178
await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken)
181179
.ConfigureAwait(false);
182-
// TODO cancellation token
183-
await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true)
180+
await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken)
184181
.ConfigureAwait(false);
185182
// TODO cancellation token
186183
await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true)
@@ -309,7 +306,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
309306
}
310307

311308
private async Task RecoverQueuesAsync(IConnection connection,
312-
bool recordedEntitiesSemaphoreHeld = false)
309+
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
313310
{
314311
if (_disposed)
315312
{
@@ -326,11 +323,11 @@ private async Task RecoverQueuesAsync(IConnection connection,
326323
try
327324
{
328325
string newName = string.Empty;
329-
using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false))
326+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
330327
{
331-
newName = await recordedQueue.RecoverAsync(ch)
328+
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
332329
.ConfigureAwait(false);
333-
await ch.CloseAsync()
330+
await ch.CloseAsync(cancellationToken)
334331
.ConfigureAwait(false);
335332
}
336333
string oldName = recordedQueue.Name;
@@ -348,12 +345,12 @@ await ch.CloseAsync()
348345
if (recordedQueue.IsServerNamed)
349346
{
350347
await DeleteRecordedQueueAsync(oldName,
351-
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld)
348+
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken)
352349
.ConfigureAwait(false);
353350
}
354351

355352
await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
356-
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld)
353+
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken)
357354
.ConfigureAwait(false);
358355

359356
if (!_queueNameChangedAfterRecoveryWrapper.IsEmpty)
@@ -365,7 +362,7 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
365362
}
366363
finally
367364
{
368-
await _recordedEntitiesSemaphore.WaitAsync()
365+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
369366
.ConfigureAwait(false);
370367
}
371368
}
@@ -379,6 +376,7 @@ await _recordedEntitiesSemaphore.WaitAsync()
379376
try
380377
{
381378
_recordedEntitiesSemaphore.Release();
379+
// TODO maybe cancellation token
382380
await _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync(recordedQueue, ex, this)
383381
.ConfigureAwait(false);
384382
}

projects/RabbitMQ.Client/client/impl/RecordedQueue.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System.Collections.Generic;
33+
using System.Threading;
3334
using System.Threading.Tasks;
3435

3536
namespace RabbitMQ.Client.Impl
@@ -71,11 +72,12 @@ public RecordedQueue(string newName, in RecordedQueue old)
7172
_arguments = old._arguments;
7273
}
7374

74-
public Task<QueueDeclareOk> RecoverAsync(IChannel channel)
75+
public Task<QueueDeclareOk> RecoverAsync(IChannel channel, CancellationToken cancellationToken)
7576
{
7677
string queueName = IsServerNamed ? string.Empty : Name;
7778
return channel.QueueDeclareAsync(queue: queueName, passive: false,
78-
durable: _durable, exclusive: _exclusive, autoDelete: AutoDelete, arguments: _arguments);
79+
durable: _durable, exclusive: _exclusive, autoDelete: AutoDelete, arguments: _arguments,
80+
cancellationToken: cancellationToken);
7981
}
8082

8183
public override string ToString()

0 commit comments

Comments
 (0)