Skip to content

Commit a99471e

Browse files
committed
* Move closed TCS into AmqpManagement and AmqpConnection.
1 parent 3585a9d commit a99471e

File tree

4 files changed

+13
-12
lines changed

4 files changed

+13
-12
lines changed

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,6 @@ public abstract class AbstractLifeCycle : ILifeCycle
1212
{
1313
protected bool _disposed;
1414

15-
// TODO this should not be part of AbstractLifeCycle
16-
// wait until the close operation is completed
17-
protected readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
18-
new(TaskCreationOptions.RunContinuationsAsynchronously);
19-
2015
public virtual Task OpenAsync()
2116
{
2217
OnNewStatus(State.Open, null);

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
4242
internal ConcurrentDictionary<Guid, IPublisher> Publishers { get; } = new();
4343
internal ConcurrentDictionary<Guid, IConsumer> Consumers { get; } = new();
4444

45+
private readonly TaskCompletionSource _connectionClosedTcs =
46+
new(TaskCreationOptions.RunContinuationsAsynchronously);
47+
4548
public ReadOnlyCollection<IPublisher> GetPublishers()
4649
{
4750
return Publishers.Values.ToList().AsReadOnly();
@@ -131,7 +134,8 @@ await _nativeConnection.CloseAsync()
131134
_semaphoreClose.Release();
132135
}
133136

134-
await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
137+
// TODO configurable timeout?
138+
await _connectionClosedTcs.Task.WaitAsync(TimeSpan.FromSeconds(10))
135139
.ConfigureAwait(false);
136140

137141
OnNewStatus(State.Closed, null);
@@ -344,7 +348,7 @@ void DoClose(Error? argError = null)
344348
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
345349
OnNewStatus(State.Closed, err);
346350
ChangeEntitiesStatus(State.Closed, err);
347-
_connectionCloseTaskCompletionSource.SetResult(true);
351+
_connectionClosedTcs.SetResult();
348352
}
349353

350354
if (error is null)
@@ -449,10 +453,10 @@ await ReconnectEntitiesAsync()
449453
}
450454
catch
451455
{
452-
// TODO set states to Closed? Error?
456+
// TODO set states to Closed? Error? Log exception? Set exception on TCS?
453457
// This will be skipped if reconnection succeeds, but if there
454458
// is an exception, it's important that this be called.
455-
_connectionCloseTaskCompletionSource.SetResult(true);
459+
_connectionClosedTcs.SetResult();
456460
throw;
457461
}
458462
finally

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
4343
internal const string Delete = "DELETE";
4444
private const string ReplyTo = "$me";
4545

46+
protected readonly TaskCompletionSource _managementSessionClosedTcs =
47+
new(TaskCreationOptions.RunContinuationsAsynchronously);
48+
4649
internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
4750
{
4851
_amqpManagementParameters = amqpManagementParameters;
@@ -165,7 +168,7 @@ private void OnManagementSessionClosed(IAmqpObject sender, Amqp.Framing.Error er
165168
OnNewStatus(State.Closed, Utils.ConvertError(error));
166169

167170
// Note: TrySetResult *must* be used here
168-
_connectionCloseTaskCompletionSource.TrySetResult(true);
171+
_managementSessionClosedTcs.TrySetResult();
169172
}
170173

171174
private async Task ProcessResponses()
@@ -472,7 +475,7 @@ public override async Task CloseAsync()
472475
await _managementSession.CloseAsync(closeSpan)
473476
.ConfigureAwait(false);
474477

475-
await _connectionCloseTaskCompletionSource.Task.WaitAsync(closeSpan)
478+
await _managementSessionClosedTcs.Task.WaitAsync(closeSpan)
476479
.ConfigureAwait(false);
477480

478481
_managementSession = null;

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,6 @@ RabbitMQ.AMQP.Client.StreamOffsetSpecification
555555
RabbitMQ.AMQP.Client.StreamOffsetSpecification.First = 0 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
556556
RabbitMQ.AMQP.Client.StreamOffsetSpecification.Last = 1 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
557557
RabbitMQ.AMQP.Client.StreamOffsetSpecification.Next = 2 -> RabbitMQ.AMQP.Client.StreamOffsetSpecification
558-
readonly RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle._connectionCloseTaskCompletionSource -> System.Threading.Tasks.TaskCompletionSource<bool>!
559558
static RabbitMQ.AMQP.Client.ByteCapacity.B(long bytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
560559
static RabbitMQ.AMQP.Client.ByteCapacity.From(string! value) -> RabbitMQ.AMQP.Client.ByteCapacity!
561560
static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!

0 commit comments

Comments
 (0)