Skip to content

Commit 9edd363

Browse files
committed
Wait until the subscription if finished before
on the close and dispose call. If a disposed is called just after the creation it waits. * RawConsumer: Add check if the consumer is open before dispach * Client: Add nextEntityId to have always a sequential ids. It avoids to use the same ids during the recycle * ConnectionPool.FindNextValidId/2:Change the way to give the ids. Given an `nextid` in input the function will give always the next value Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent ae0ea1c commit 9edd363

File tree

9 files changed

+216
-85
lines changed

9 files changed

+216
-85
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ internal enum EntityStatus
1818
{
1919
Open,
2020
Closed,
21-
Disposed
21+
Disposed,
22+
Initializing
2223
}
2324

2425
public interface IClosable
@@ -64,7 +65,7 @@ private void MaybeCancelToken()
6465
/// <returns></returns>
6566
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
6667

67-
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
68+
// private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
6869

6970
/// <summary>
7071
/// Internal close method. It is called by the public Close method.
@@ -76,35 +77,27 @@ private void MaybeCancelToken()
7677
/// <returns></returns>
7778
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
7879
{
79-
await _writeLock.WaitAsync().ConfigureAwait(false);
80-
try
80+
if (!IsOpen()) // the client is already closed
8181
{
82-
MaybeCancelToken();
83-
84-
if (!IsOpen()) // the client is already closed
85-
{
86-
return ResponseCode.Ok;
87-
}
82+
return ResponseCode.Ok;
83+
}
8884

89-
_status = EntityStatus.Closed;
90-
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
85+
MaybeCancelToken();
9186

92-
if (_client is { IsClosed: true })
93-
{
94-
return result;
95-
}
87+
_status = EntityStatus.Closed;
88+
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
9689

97-
var closed = await _client.MaybeClose($"closing: {EntityId}",
98-
GetStream(), config.Pool)
99-
.ConfigureAwait(false);
100-
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
101-
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
102-
return result;
103-
}
104-
finally
90+
if (_client is { IsClosed: true })
10591
{
106-
_writeLock.Release();
92+
return result;
10793
}
94+
95+
var closed = await _client.MaybeClose($"closing: {EntityId}",
96+
GetStream(), config.Pool)
97+
.ConfigureAwait(false);
98+
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
99+
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
100+
return result;
108101
}
109102

110103
protected void Dispose(bool disposing)

RabbitMQ.Stream.Client/Client.cs

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
207207
client.connection = await Connection
208208
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
209209
.ConfigureAwait(false);
210+
client.connection.ClientId = client.ClientId;
210211
// exchange properties
211212
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
212213
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
@@ -306,7 +307,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
306307
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
307308
{
308309
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
309-
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
310+
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList(), IncrementEntityId());
310311
DeclarePublisherResponse response;
311312

312313
try
@@ -320,7 +321,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
320321
_poolSemaphore.Release();
321322
}
322323

323-
if (response.ResponseCode == ResponseCode.Ok || pool == null)
324+
if (response.ResponseCode == ResponseCode.Ok)
324325
return (publisherId, response);
325326

326327
// if the response code is not ok we need to remove the subscription
@@ -357,59 +358,65 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
357358
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
358359
ushort initialCredit,
359360
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
360-
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
361+
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null, ConnectionsPool pool = null)
361362
{
362-
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
363+
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType, Pool = pool },
363364
initialCredit,
364365
properties,
365366
deliverHandler,
366367
consumerUpdateHandler).ConfigureAwait(false);
367368
}
368369

370+
private byte _nextEntityId = 0;
371+
372+
// the entity id is a byte so we need to increment it and reset it when it reaches the max value
373+
// to avoid to use always the same ids when producers and consumers are created
374+
// so even there is a connection with one producer or consumer we need to increment the id
375+
private byte IncrementEntityId()
376+
{
377+
lock (Obj)
378+
{
379+
var current = _nextEntityId;
380+
_nextEntityId++;
381+
if (_nextEntityId != byte.MaxValue)
382+
return current;
383+
_nextEntityId = 0;
384+
return _nextEntityId;
385+
}
386+
}
387+
369388
public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
370389
ushort initialCredit,
371390
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
372391
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
373392
{
374393
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
375-
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
376-
394+
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList(), IncrementEntityId());
395+
SubscribeResponse response;
377396
try
378397
{
379-
SubscribeResponse response;
380-
try
381-
{
382-
consumers.Add(subscriptionId,
383-
new ConsumerEvents(
384-
deliverHandler,
385-
consumerUpdateHandler));
386-
387-
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
388-
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
389-
properties)).ConfigureAwait(false);
390-
}
391-
finally
392-
{
393-
_poolSemaphore.Release();
394-
}
395-
396-
if (response.ResponseCode == ResponseCode.Ok)
397-
return (subscriptionId, response);
398+
consumers.Add(subscriptionId,
399+
new ConsumerEvents(
400+
deliverHandler,
401+
consumerUpdateHandler));
398402

399-
ClientExceptions.MaybeThrowException(response.ResponseCode,
400-
$"Error while creating consumer for stream {config.Stream}");
403+
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
404+
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
405+
properties)).ConfigureAwait(false);
401406
}
402-
catch (Exception e)
407+
finally
403408
{
404-
// if the response code is not ok we need to remove the subscription
405-
// and close the connection if necessary.
406-
consumers.Remove(subscriptionId);
407-
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
408-
throw new CreateConsumerException(
409-
$"Error while creating consumer for stream {config.Stream}, error: {e.Message}");
409+
_poolSemaphore.Release();
410410
}
411411

412-
return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError));
412+
if (response.ResponseCode == ResponseCode.Ok)
413+
return (subscriptionId, response);
414+
415+
// if the response code is not ok we need to remove the subscription
416+
// and close the connection if necessary.
417+
consumers.Remove(subscriptionId);
418+
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
419+
return (subscriptionId, response);
413420
}
414421

415422
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
@@ -799,7 +806,6 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
799806
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
800807
// the client can be closed in an unexpected way so we need to remove it from the pool
801808
// so you will find pool.remove(ClientId) also to the disconnect event
802-
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
803809
pool.Remove(ClientId);
804810
await Close(reason).ConfigureAwait(false);
805811
}

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class Connection : IDisposable
3333
private CancellationToken Token => _cancelTokenSource.Token;
3434

3535
internal int NumFrames => numFrames;
36-
36+
internal string ClientId { get; set; }
3737
public bool IsClosed => isClosed;
3838

3939
private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
@@ -191,6 +191,8 @@ private async Task ProcessIncomingFrames()
191191
finally
192192
{
193193
isClosed = true;
194+
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
195+
ClientId, Token.IsCancellationRequested);
194196
// Mark the PipeReader as complete
195197
await reader.CompleteAsync(caught).ConfigureAwait(false);
196198
var t = closedCallback?.Invoke("TCP Connection Closed")!;

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,30 +100,35 @@ public class ConnectionsPool
100100
{
101101
private static readonly object s_lock = new();
102102

103-
internal static byte FindNextValidId(List<byte> ids)
103+
internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
104104
{
105105
lock (s_lock)
106106
{
107-
if (ids.Count == 0)
108-
{
109-
return 0;
110-
}
111-
112107
// // we start with the recycle when we reach the max value
113108
// // in this way we can avoid to recycle the same ids in a short time
114109
ids.Sort();
115-
if (ids[^1] != byte.MaxValue)
116-
return (byte)(ids[^1] + 1);
110+
var l = ids.Where(b => b >= nextId).ToList();
111+
l.Sort();
112+
if (l.Count == 0)
113+
{
114+
// not necessary to start from 0 because the ids are recycled
115+
// nextid is passed as parameter to avoid to start from 0
116+
// see client:IncrementEntityId/0
117+
return nextId;
118+
}
119+
120+
if (l[^1] != byte.MaxValue)
121+
return (byte)(l[^1] + 1);
117122

118123
for (var i = 0; i < ids.Count - 1; i++)
119124
{
120-
if (ids[i + 1] - ids[i] > 1)
125+
if (l[i + 1] - l[i] > 1)
121126
{
122-
return (byte)(ids[i] + 1);
127+
return (byte)(l[i] + 1);
123128
}
124129
}
125130

126-
return (byte)(ids[^1] + 1);
131+
return (byte)(l[^1] + 1);
127132
}
128133
}
129134

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ RabbitMQ.Stream.Client.Client.QueryPublisherSequence(string publisherRef, string
186186
RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.ValueTask<bool>
187187
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
188188
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
189-
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
190189
RabbitMQ.Stream.Client.ClientParameters
191190
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
192191
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlr
3333
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
3434
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
3535
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
36+
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
3637
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
3738
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
3839
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,20 @@ async Task DispatchMessage(Message message, ulong i)
266266
// it is useful only in single active consumer
267267
if (IsPromotedAsActive)
268268
{
269-
var canDispatch = true;
269+
if (_status != EntityStatus.Open)
270+
{
271+
Logger?.LogDebug(
272+
"{EntityInfo} is not active. message won't dispatched",
273+
DumpEntityConfiguration());
274+
}
275+
276+
// can dispatch only if the consumer is active
277+
// it usually at this point the consumer is active
278+
// but in rare case where the consumer is closed and open in a short
279+
// time the ids could be the same to not problem we need just to skip the message
280+
// Given the way how the ids are generated it is very rare to have the same ids
281+
// it is just a safety check
282+
var canDispatch = _status == EntityStatus.Open;
270283

271284
if (_config.IsFiltering)
272285
{
@@ -280,7 +293,7 @@ async Task DispatchMessage(Message message, ulong i)
280293
}
281294
catch (Exception e)
282295
{
283-
Logger.LogError(e,
296+
Logger?.LogError(e,
284297
"Error while filtering message. Message with offset {MessageOffset} won't be dispatched."
285298
+ "Suggestion: review the PostFilter value function"
286299
+ "{EntityInfo}",
@@ -394,7 +407,7 @@ private void ProcessChunks()
394407
{
395408
// need to wait the subscription is completed
396409
// else the _subscriberId could be incorrect
397-
await _completeSubscription.Task.ConfigureAwait(false);
410+
_completeSubscription.Task.Wait();
398411
try
399412
{
400413
while (!Token.IsCancellationRequested &&
@@ -500,6 +513,7 @@ private async Task Init()
500513
var chunkConsumed = 0;
501514
// this the default value for the consumer.
502515
_config.StoredOffsetSpec = _config.OffsetSpec;
516+
_status = EntityStatus.Initializing;
503517
(EntityId, var response) = await _client.Subscribe(
504518
_config,
505519
_initialCredits,
@@ -580,8 +594,9 @@ private async Task Init()
580594

581595
if (response.ResponseCode == ResponseCode.Ok)
582596
{
583-
_completeSubscription.SetResult();
584597
_status = EntityStatus.Open;
598+
// the subscription is completed so the parsechunk can start to process the chunks
599+
_completeSubscription.SetResult();
585600
return;
586601
}
587602

@@ -653,12 +668,13 @@ public override async Task<ResponseCode> Close()
653668
// when the consumer is closed we must be sure that the
654669
// the subscription is completed to avoid problems with the connection
655670
// It could happen when the closing is called just after the creation
656-
await _completeSubscription.Task.ConfigureAwait(false);
671+
_completeSubscription.Task.Wait();
657672
return await Shutdown(_config).ConfigureAwait(false);
658673
}
659674

660675
public void Dispose()
661676
{
677+
_completeSubscription.Task.Wait();
662678
try
663679
{
664680
Dispose(true);

0 commit comments

Comments
 (0)