Skip to content

Commit 3d46579

Browse files
committed
Add the semaphore to sync the
producers and consumers requests. Remove the way to decide the nextid for producers and consumers now it uses the client lists Make the consumer and producer close idempotent. Add EntityStatus to handle the connection status for producers and consumers Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c33706b commit 3d46579

File tree

8 files changed

+189
-108
lines changed

8 files changed

+189
-108
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,34 @@
66

77
namespace RabbitMQ.Stream.Client
88
{
9+
10+
internal enum EntityStatus
11+
{
12+
Open,
13+
Closed,
14+
Disposed
15+
}
916
public abstract class AbstractEntity
1017
{
1118
private readonly CancellationTokenSource _cancelTokenSource = new();
1219
protected CancellationToken Token => _cancelTokenSource.Token;
1320

21+
internal EntityStatus _status = EntityStatus.Closed;
1422
// here the _cancelTokenSource is disposed and the token is cancelled
1523
// in producer is used to cancel the send task
1624
// in consumer is used to cancel the receive task
1725
protected void MaybeCancelToken()
1826
{
27+
1928
if (!_cancelTokenSource.IsCancellationRequested)
2029
_cancelTokenSource.Cancel();
2130
}
2231

32+
public bool IsOpen()
33+
{
34+
return _status == EntityStatus.Open;
35+
}
36+
2337
protected Client _client;
2438

2539
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 71 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,14 @@ public class Client : IClient
109109

110110
private Connection connection;
111111

112-
private readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
113-
publishers =
114-
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
115-
116112
private readonly ConcurrentDictionary<uint, IValueTaskSource> requests = new();
117113

118114
private readonly TaskCompletionSource<TuneResponse> tuneReceived =
119115
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
120116

121-
private readonly List<byte> subscriptionIds = new();
122-
private readonly List<byte> publisherIds = new();
117+
internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
118+
publishers =
119+
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
123120

124121
internal readonly IDictionary<byte, ConsumerEvents> consumers =
125122
new ConcurrentDictionary<byte, ConsumerEvents>();
@@ -157,38 +154,11 @@ private Client(ClientParameters parameters, ILogger logger = null)
157154
IsClosed = false;
158155
_logger = logger ?? NullLogger.Instance;
159156
ClientId = Guid.NewGuid().ToString();
160-
}
161-
162-
private byte GetNextSubscriptionId()
163-
{
164-
byte result;
165-
lock (Obj)
166-
{
167-
result = ConnectionsPool.FindMissingConsecutive(subscriptionIds);
168-
subscriptionIds.Add(result);
169-
}
170-
171-
return result;
172-
}
173-
174-
private byte GetNextPublisherId()
175-
{
176-
byte result;
177-
lock (Obj)
178-
{
179-
result = ConnectionsPool.FindMissingConsecutive(publisherIds);
180-
publisherIds.Add(result);
181-
}
182-
183-
return result;
184-
}
185-
186-
internal void RemoveSubscriptionId(byte id)
187-
{
188-
lock (Obj)
157+
AppDomain.CurrentDomain.UnhandledException += (sender, args) =>
189158
{
190-
subscriptionIds.Remove(id);
191-
}
159+
_logger.LogError(args.ExceptionObject as Exception, "Unhandled exception");
160+
Parameters.UnhandledExceptionHandler(args.ExceptionObject as Exception);
161+
};
192162
}
193163

194164
public bool IsClosed
@@ -327,14 +297,23 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
327297
Action<ReadOnlyMemory<ulong>> confirmCallback,
328298
Action<(ulong, ResponseCode)[]> errorCallback)
329299
{
330-
var publisherId = GetNextPublisherId();
331-
publishers.Add(publisherId, (confirmCallback, errorCallback));
332-
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
333-
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
300+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
301+
try
302+
{
303+
var publisherId = ConnectionsPool.FindMissingConsecutive(publishers.Keys.ToList());
304+
publishers.Add(publisherId, (confirmCallback, errorCallback));
305+
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
306+
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
307+
}
308+
finally
309+
{
310+
_poolSemaphore.Release();
311+
}
334312
}
335313

336314
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
337315
{
316+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
338317
try
339318
{
340319
var result =
@@ -346,6 +325,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
346325
finally
347326
{
348327
publishers.Remove(publisherId);
328+
_poolSemaphore.Release();
349329
}
350330
}
351331

@@ -366,21 +346,29 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
366346
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
367347
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
368348
{
369-
var subscriptionId = GetNextSubscriptionId();
370-
371-
consumers.Add(subscriptionId,
372-
new ConsumerEvents(
373-
deliverHandler,
374-
consumerUpdateHandler));
349+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
350+
try
351+
{
352+
var subscriptionId = ConnectionsPool.FindMissingConsecutive(consumers.Keys.ToList());
353+
consumers.Add(subscriptionId,
354+
new ConsumerEvents(
355+
deliverHandler,
356+
consumerUpdateHandler));
375357

376-
return (subscriptionId,
377-
await Request<SubscribeRequest, SubscribeResponse>(corr =>
378-
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
379-
properties)).ConfigureAwait(false));
358+
return (subscriptionId,
359+
await Request<SubscribeRequest, SubscribeResponse>(corr =>
360+
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
361+
properties)).ConfigureAwait(false));
362+
}
363+
finally
364+
{
365+
_poolSemaphore.Release();
366+
}
380367
}
381368

382369
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
383370
{
371+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
384372
try
385373
{
386374
// here we reduce a bit the timeout to avoid waiting too much
@@ -398,6 +386,7 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
398386
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
399387
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
400388
consumers.Remove(subscriptionId);
389+
_poolSemaphore.Release();
401390
}
402391
}
403392

@@ -681,6 +670,7 @@ public async Task<CloseResponse> Close(string reason)
681670
return new CloseResponse(0, ResponseCode.Ok);
682671
}
683672

673+
InternalClose();
684674
try
685675
{
686676
var result =
@@ -700,41 +690,54 @@ public async Task<CloseResponse> Close(string reason)
700690
}
701691
finally
702692
{
703-
// even if the close fails we need to close the connection
704-
InternalClose();
705693
connection.Dispose();
706694
}
707695

708696
return new CloseResponse(0, ResponseCode.Ok);
709697
}
710698

699+
// _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
700+
// the MaybeClose can be called in different threads so we need to protect the pool
701+
// the pool itself is thread safe but we need to protect the flow to be sure that the
702+
// connection is released only once
703+
private readonly SemaphoreSlim _poolSemaphore = new(1, 1);
704+
711705
// Safe close
712706
// the client can be closed only if HasEntities is false
713707
// if the client has entities (publishers or consumers) it will be released from the pool
714708
// Release will decrement the active ids for the connection
715709
// if the active ids are 0 the connection will be closed
710+
716711
internal async Task<CloseResponse> MaybeClose(string reason, string stream, ConnectionsPool pool)
717712
{
718-
if (!string.IsNullOrEmpty(ClientId))
719-
{
720-
_logger.LogInformation("Releasing connection {Connection}", ClientId);
721-
pool.Release(ClientId, stream);
722-
}
723-
724-
if (!HasEntities())
713+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
714+
try
725715
{
726716
if (!string.IsNullOrEmpty(ClientId))
727717
{
728-
_logger.LogInformation("Close connection {Connection}", ClientId);
729-
// pool.remove(ClientId) is a duplicate call here but it is ok
730-
// the client can be closed in an unexpected way so we need to remove it from the pool
731-
// so you will find pool.remove(ClientId) also to the disconnect event
732-
await Close(reason).ConfigureAwait(false);
718+
_logger.LogInformation("Releasing connection {Connection}", ClientId);
719+
pool.Release(ClientId, stream);
720+
}
721+
722+
if (!HasEntities())
723+
{
724+
if (!string.IsNullOrEmpty(ClientId))
725+
{
726+
_logger.LogInformation("Close connection {Connection}", ClientId);
727+
// pool.remove(ClientId) is a duplicate call here but it is ok
728+
// the client can be closed in an unexpected way so we need to remove it from the pool
729+
// so you will find pool.remove(ClientId) also to the disconnect event
730+
await Close(reason).ConfigureAwait(false);
731+
}
733732
}
734-
}
735733

736-
var result = new CloseResponse(0, ResponseCode.Ok);
737-
return result;
734+
var result = new CloseResponse(0, ResponseCode.Ok);
735+
return result;
736+
}
737+
finally
738+
{
739+
_poolSemaphore.Release();
740+
}
738741
}
739742

740743
public string ClientId { get; init; }

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,28 @@ public bool Available
8484
/// </summary>
8585
public class ConnectionsPool
8686
{
87+
private static readonly object s_lock = new();
88+
8789
internal static byte FindMissingConsecutive(List<byte> ids)
8890
{
89-
if (ids.Count == 0)
91+
lock (s_lock)
9092
{
91-
return 0;
92-
}
93+
if (ids.Count == 0)
94+
{
95+
return 0;
96+
}
9397

94-
ids.Sort();
95-
for (var i = 0; i < ids.Count - 1; i++)
96-
{
97-
if (ids[i + 1] - ids[i] > 1)
98+
ids.Sort();
99+
for (var i = 0; i < ids.Count - 1; i++)
98100
{
99-
return (byte)(ids[i] + 1);
101+
if (ids[i + 1] - ids[i] > 1)
102+
{
103+
return (byte)(ids[i] + 1);
104+
}
100105
}
101-
}
102106

103-
return (byte)(ids[^1] + 1);
107+
return (byte)(ids[^1] + 1);
108+
}
104109
}
105110

106111
private readonly int _maxConnections;
@@ -233,9 +238,16 @@ public void Remove(string clientId)
233238

234239
public int ActiveIdsCount => Connections.Values.Sum(x => x.StreamIds.Values.Sum(y => y.Count));
235240

236-
public int ActiveIdsCountForStream(string stream) => Connections.Values.Sum(x => x.StreamIds.TryGetValue(stream, out var streamIds) ? streamIds.Count : 0);
241+
public int ActiveIdsCountForStream(string stream) => Connections.Values.Sum(x =>
242+
x.StreamIds.TryGetValue(stream, out var streamIds) ? streamIds.Count : 0);
237243

238-
public int ActiveIdsCountForClient(string clientId) => Connections.TryGetValue(clientId, out var connectionItem) ? connectionItem.StreamIds.Values.Sum(y => y.Count) : 0;
244+
public int ActiveIdsCountForClient(string clientId) => Connections.TryGetValue(clientId, out var connectionItem)
245+
? connectionItem.StreamIds.Values.Sum(y => y.Count)
246+
: 0;
239247

240-
public int ActiveIdsCountForClientAndStream(string clientId, string stream) => Connections.TryGetValue(clientId, out var connectionItem) && connectionItem.StreamIds.TryGetValue(stream, out var streamIds) ? streamIds.Count : 0;
248+
public int ActiveIdsCountForClientAndStream(string clientId, string stream) =>
249+
Connections.TryGetValue(clientId, out var connectionItem) &&
250+
connectionItem.StreamIds.TryGetValue(stream, out var streamIds)
251+
? streamIds.Count
252+
: 0;
241253
}

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,6 @@ RabbitMQ.Stream.Client.RawProducer.ConfirmFrames.get -> int
547547
RabbitMQ.Stream.Client.RawProducer.Dispose() -> void
548548
RabbitMQ.Stream.Client.RawProducer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>
549549
RabbitMQ.Stream.Client.RawProducer.IncomingFrames.get -> int
550-
RabbitMQ.Stream.Client.RawProducer.IsOpen() -> bool
551550
RabbitMQ.Stream.Client.RawProducer.MessagesSent.get -> int
552551
RabbitMQ.Stream.Client.RawProducer.PendingCount.get -> int
553552
RabbitMQ.Stream.Client.RawProducer.PublishCommandsSent.get -> int

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
22
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
33
override RabbitMQ.Stream.Client.Broker.ToString() -> string
4+
RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
45
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
56
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
67
RabbitMQ.Stream.Client.AuthMechanism

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ internal void Validate()
115115

116116
public class RawConsumer : AbstractEntity, IConsumer, IDisposable
117117
{
118-
private bool _disposed;
119118
private readonly RawConsumerConfig _config;
120119
private byte _subscriberId;
121120
private readonly ILogger _logger;
@@ -555,6 +554,7 @@ private async Task Init()
555554
if (response.ResponseCode == ResponseCode.Ok)
556555
{
557556
_subscriberId = consumerId;
557+
_status = EntityStatus.Open;
558558
return;
559559
}
560560

@@ -568,7 +568,7 @@ public async Task<ResponseCode> Close()
568568
// see DispatchMessage method where the token is used
569569
MaybeCancelToken();
570570

571-
if (_client.IsClosed)
571+
if (!IsOpen())
572572
{
573573
return ResponseCode.Ok;
574574
}
@@ -596,8 +596,6 @@ public async Task<ResponseCode> Close()
596596
_subscriberId, ConsumerInfo());
597597
}
598598

599-
_client.RemoveSubscriptionId(_subscriberId);
600-
601599
var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}",
602600
_config.Stream, _config.Pool)
603601
.ConfigureAwait(false);
@@ -614,7 +612,7 @@ private void Dispose(bool disposing)
614612
return;
615613
}
616614

617-
if (_disposed)
615+
if (_status == EntityStatus.Disposed)
618616
{
619617
return;
620618
}
@@ -632,7 +630,7 @@ private void Dispose(bool disposing)
632630
}
633631
finally
634632
{
635-
_disposed = true;
633+
_status = EntityStatus.Disposed;
636634
}
637635
}
638636

0 commit comments

Comments
 (0)