Skip to content

Commit 7468d90

Browse files
committed
* Rename _disposedValue to _disposed
* Check for `_disposed` before releasing semaphore
1 parent 5514cec commit 7468d90

File tree

5 files changed

+28
-17
lines changed

5 files changed

+28
-17
lines changed

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ public class AmqpNotOpenException(string message) : Exception(message);
66

77
public abstract class AbstractLifeCycle : ILifeCycle
88
{
9-
private bool _disposedValue;
9+
protected bool _disposed;
10+
11+
// TODO this should not be part of AbstractLifeCycle
12+
// wait until the close operation is completed
13+
protected readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
14+
new(TaskCreationOptions.RunContinuationsAsynchronously);
1015

1116
public virtual Task OpenAsync()
1217
{
@@ -44,10 +49,6 @@ protected void ThrowIfClosed()
4449
}
4550
}
4651

47-
// wait until the close operation is completed
48-
protected readonly TaskCompletionSource<bool> ConnectionCloseTaskCompletionSource =
49-
new(TaskCreationOptions.RunContinuationsAsynchronously);
50-
5152
protected void OnNewStatus(State newState, Error? error)
5253
{
5354
if (State == newState)
@@ -62,7 +63,7 @@ protected void OnNewStatus(State newState, Error? error)
6263

6364
protected virtual void Dispose(bool disposing)
6465
{
65-
if (!_disposedValue)
66+
if (false == _disposed)
6667
{
6768
if (disposing)
6869
{
@@ -71,7 +72,7 @@ protected virtual void Dispose(bool disposing)
7172

7273
// TODO: free unmanaged resources (unmanaged objects) and override finalizer
7374
// TODO: set large fields to null
74-
_disposedValue = true;
75+
_disposed = true;
7576
}
7677
}
7778

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ await _nativeConnection.CloseAsync()
128128
_semaphoreClose.Release();
129129
}
130130

131-
await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
131+
await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
132132
.ConfigureAwait(false);
133133

134134
OnNewStatus(State.Closed, null);
@@ -305,6 +305,11 @@ private ClosedCallback BuildClosedCallback()
305305
{
306306
return async (sender, error) =>
307307
{
308+
if (_disposed)
309+
{
310+
return;
311+
}
312+
308313
await _semaphoreClose.WaitAsync()
309314
.ConfigureAwait(false);
310315
try
@@ -318,7 +323,7 @@ void DoClose(Error? argError = null)
318323
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
319324
OnNewStatus(State.Closed, err);
320325
ChangeEntitiesStatus(State.Closed, err);
321-
ConnectionCloseTaskCompletionSource.SetResult(true);
326+
_connectionCloseTaskCompletionSource.SetResult(true);
322327
}
323328

324329
if (error is null)
@@ -424,12 +429,16 @@ await _recordingTopologyListener.Accept(visitor)
424429
// TODO set states to Closed? Error?
425430
// This will be skipped if reconnection succeeds, but if there
426431
// is an exception, it's important that this be called.
427-
ConnectionCloseTaskCompletionSource.SetResult(true);
432+
_connectionCloseTaskCompletionSource.SetResult(true);
428433
throw;
429434
}
430435
finally
431436
{
432-
_semaphoreClose.Release();
437+
// TODO it is odd to have to add this check
438+
if (false == _disposed)
439+
{
440+
_semaphoreClose.Release();
441+
}
433442
}
434443
};
435444
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ await EnsureReceiverLinkAsync()
129129

130130
OnNewStatus(State.Closed, Utils.ConvertError(error));
131131
// Note: TrySetResult *must* be used here
132-
ConnectionCloseTaskCompletionSource.TrySetResult(true);
132+
_connectionCloseTaskCompletionSource.TrySetResult(true);
133133
};
134134

135135
await base.OpenAsync()
@@ -442,7 +442,7 @@ public override async Task CloseAsync()
442442
await _managementSession.CloseAsync(closeSpan)
443443
.ConfigureAwait(false);
444444

445-
await ConnectionCloseTaskCompletionSource.Task.WaitAsync(closeSpan)
445+
await _connectionCloseTaskCompletionSource.Task.WaitAsync(closeSpan)
446446
.ConfigureAwait(false);
447447

448448
_managementSession = null;

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Diagnostics;
2-
using Amqp;
1+
using Amqp;
32
using Amqp.Framing;
43
using Trace = Amqp.Trace;
54
using TraceLevel = Amqp.TraceLevel;
@@ -98,7 +97,8 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
9897

9998
try
10099
{
101-
Debug.Assert(false == _senderLink.IsClosed);
100+
// TODO anything special to do if the _senderLink is closed?
101+
// Debug.Assert(false == _senderLink.IsClosed);
102102
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
103103
await _senderLink.SendAsync(nativeMessage)
104104
.ConfigureAwait(false);

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.Dispose() -> void
171171
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.OnNewStatus(RabbitMQ.AMQP.Client.State newState, RabbitMQ.AMQP.Client.Error? error) -> void
172172
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.State.get -> RabbitMQ.AMQP.Client.State
173173
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ThrowIfClosed() -> void
174+
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle._disposed -> bool
174175
RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle
175176
RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle.AbstractReconnectLifeCycle() -> void
176177
RabbitMQ.AMQP.Client.Impl.AddressBuilder
@@ -567,7 +568,7 @@ RabbitMQ.AMQP.Client.StreamOffsetSpecification
567568
RabbitMQ.AMQP.Client.StreamOffsetSpecification.First = 0 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
568569
RabbitMQ.AMQP.Client.StreamOffsetSpecification.Last = 1 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
569570
RabbitMQ.AMQP.Client.StreamOffsetSpecification.Next = 2 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
570-
readonly RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ConnectionCloseTaskCompletionSource -> System.Threading.Tasks.TaskCompletionSource<bool>!
571+
readonly RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle._connectionCloseTaskCompletionSource -> System.Threading.Tasks.TaskCompletionSource<bool>!
571572
static RabbitMQ.AMQP.Client.ByteCapacity.B(long bytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
572573
static RabbitMQ.AMQP.Client.ByteCapacity.From(string! value) -> RabbitMQ.AMQP.Client.ByteCapacity!
573574
static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!

0 commit comments

Comments
 (0)