Skip to content

1749 review #1761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -48,8 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private int _isDisposing;

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -254,32 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();

public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -293,7 +276,6 @@ await this.AbortAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down Expand Up @@ -508,7 +490,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
42 changes: 20 additions & 22 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -50,8 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection

private Connection _innerConnection;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private int _isDisposing;

private Connection InnerConnection
{
Expand Down Expand Up @@ -270,32 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
return autorecoveringChannel;
}

public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
await _innerConnection.DisposeAsync()
Expand All @@ -313,7 +296,6 @@ await _innerConnection.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand All @@ -328,7 +310,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
58 changes: 25 additions & 33 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ internal partial class Channel : IChannel, IRecoverable
internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposed;
private bool _isDisposing;

private readonly object _locker = new();
private int _isDisposing;

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
Expand Down Expand Up @@ -531,20 +529,11 @@ void IDisposable.Dispose()

protected virtual void Dispose(bool disposing)
{
if (_disposed)
if (IsDisposing)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

if (disposing)
{
try
Expand All @@ -554,10 +543,7 @@ protected virtual void Dispose(bool disposing)
this.AbortAsync().GetAwaiter().GetResult();
}

if (_serverOriginatedChannelCloseTcs is not null)
{
_serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5));
}
_serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5));

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
Expand All @@ -567,7 +553,6 @@ protected virtual void Dispose(bool disposing)
finally
{
_disposed = true;
_isDisposing = false;
}
}
}
Expand All @@ -582,20 +567,11 @@ await DisposeAsyncCore()

protected virtual async ValueTask DisposeAsyncCore()
{
if (_disposed)
if (IsDisposing)
{
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -612,6 +588,7 @@ await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();

if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
Expand All @@ -621,7 +598,6 @@ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down Expand Up @@ -718,9 +694,12 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
lock (_locker)
TaskCompletionSource<bool>? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs;
if (serverOriginatedChannelCloseTcs is null)
{
_serverOriginatedChannelCloseTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
// Attempt to assign the new TCS only if _tcs is still null
_ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs,
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), null);
}

try
Expand All @@ -742,12 +721,12 @@ await ModelSendAsync(in method, cancellationToken)
await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);

_serverOriginatedChannelCloseTcs.TrySetResult(true);
_serverOriginatedChannelCloseTcs?.TrySetResult(true);
return true;
}
catch (Exception ex)
{
_serverOriginatedChannelCloseTcs.TrySetException(ex);
_serverOriginatedChannelCloseTcs?.TrySetException(ex);
throw;
}
}
Expand Down Expand Up @@ -1669,5 +1648,18 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
}
}
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
Loading
Loading