Skip to content

Commit 9775cd4

Browse files
committed
* Move code to deal with outstanding confirms on channel shutdown.
1 parent 58ac469 commit 9775cd4

File tree

2 files changed

+29
-22
lines changed

2 files changed

+29
-22
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,5 +166,31 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
166166
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);
167167
}
168168
}
169+
170+
private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason)
171+
{
172+
if (_publisherConfirmationsEnabled)
173+
{
174+
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
175+
.ConfigureAwait(false);
176+
try
177+
{
178+
if (!_confirmsTaskCompletionSources.IsEmpty)
179+
{
180+
var exception = new AlreadyClosedException(reason);
181+
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
182+
{
183+
confirmsTaskCompletionSource.TrySetException(exception);
184+
}
185+
186+
_confirmsTaskCompletionSources.Clear();
187+
}
188+
}
189+
finally
190+
{
191+
_confirmSemaphore.Release();
192+
}
193+
}
194+
}
169195
}
170196
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -482,31 +482,12 @@ internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
482482
private async Task OnChannelShutdownAsync(ShutdownEventArgs reason)
483483
{
484484
_continuationQueue.HandleChannelShutdown(reason);
485+
485486
await _channelShutdownAsyncWrapper.InvokeAsync(this, reason)
486487
.ConfigureAwait(false);
487488

488-
if (_publisherConfirmationsEnabled)
489-
{
490-
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
491-
.ConfigureAwait(false);
492-
try
493-
{
494-
if (!_confirmsTaskCompletionSources.IsEmpty)
495-
{
496-
var exception = new AlreadyClosedException(reason);
497-
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
498-
{
499-
confirmsTaskCompletionSource.TrySetException(exception);
500-
}
501-
502-
_confirmsTaskCompletionSources.Clear();
503-
}
504-
}
505-
finally
506-
{
507-
_confirmSemaphore.Release();
508-
}
509-
}
489+
await MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(reason)
490+
.ConfigureAwait(false);
510491

511492
_flowControlBlock.Set();
512493
}

0 commit comments

Comments
 (0)