Skip to content

Commit bb5af4c

Browse files
committed
handle the cancellation token
* during the handle deliver the consumer could receive a Token cancellation In this commit the consumer handle it with a log and exit. It will avoid to propagate the error and close the TCP connection * Add a lock around the IsOpen() function to make it thread-safe. In normal situations in does not matter. It is useful when a consumer is created and destroyed in a short time * Handle the Subscribe error. In case there is an error during the init. The error will be raised to the caller but the pool must be consistent Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 78b7e79 commit bb5af4c

File tree

6 files changed

+115
-82
lines changed

6 files changed

+115
-82
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
namespace RabbitMQ.Stream.Client
1111
{
12-
1312
public abstract record EntityCommonConfig
1413
{
1514
internal ConnectionsPool Pool { get; set; }
@@ -65,6 +64,8 @@ private void MaybeCancelToken()
6564
/// <returns></returns>
6665
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
6766

67+
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
68+
6869
/// <summary>
6970
/// Internal close method. It is called by the public Close method.
7071
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
@@ -75,27 +76,35 @@ private void MaybeCancelToken()
7576
/// <returns></returns>
7677
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
7778
{
78-
MaybeCancelToken();
79-
80-
if (!IsOpen()) // the client is already closed
79+
await _writeLock.WaitAsync().ConfigureAwait(false);
80+
try
8181
{
82-
return ResponseCode.Ok;
83-
}
82+
MaybeCancelToken();
8483

85-
_status = EntityStatus.Closed;
86-
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
84+
if (!IsOpen()) // the client is already closed
85+
{
86+
return ResponseCode.Ok;
87+
}
8788

88-
if (_client is { IsClosed: true })
89-
{
89+
_status = EntityStatus.Closed;
90+
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
91+
92+
if (_client is { IsClosed: true })
93+
{
94+
return result;
95+
}
96+
97+
var closed = await _client.MaybeClose($"closing: {EntityId}",
98+
GetStream(), config.Pool)
99+
.ConfigureAwait(false);
100+
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
101+
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
90102
return result;
91103
}
92-
93-
var closed = await _client.MaybeClose($"closing: {EntityId}",
94-
GetStream(), config.Pool)
95-
.ConfigureAwait(false);
96-
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
97-
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
98-
return result;
104+
finally
105+
{
106+
_writeLock.Release();
107+
}
99108
}
100109

101110
protected void Dispose(bool disposing)
@@ -120,7 +129,8 @@ protected void Dispose(bool disposing)
120129
}
121130
catch (Exception e)
122131
{
123-
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
132+
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(),
133+
e.Message);
124134
}
125135
finally
126136
{

RabbitMQ.Stream.Client/Client.cs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
207207
client.connection = await Connection
208208
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
209209
.ConfigureAwait(false);
210-
211210
// exchange properties
212211
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
213212
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
@@ -374,31 +373,41 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
374373
{
375374
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
376375
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
377-
SubscribeResponse response;
376+
378377
try
379378
{
380-
consumers.Add(subscriptionId,
381-
new ConsumerEvents(
382-
deliverHandler,
383-
consumerUpdateHandler));
379+
SubscribeResponse response;
380+
try
381+
{
382+
consumers.Add(subscriptionId,
383+
new ConsumerEvents(
384+
deliverHandler,
385+
consumerUpdateHandler));
386+
387+
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
388+
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
389+
properties)).ConfigureAwait(false);
390+
}
391+
finally
392+
{
393+
_poolSemaphore.Release();
394+
}
395+
396+
if (response.ResponseCode == ResponseCode.Ok)
397+
return (subscriptionId, response);
384398

385-
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
386-
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
387-
properties)).ConfigureAwait(false);
399+
ClientExceptions.MaybeThrowException(response.ResponseCode, $"Error while creating consumer for stream {config.Stream}");
388400
}
389-
finally
401+
catch (Exception e)
390402
{
391-
_poolSemaphore.Release();
403+
// if the response code is not ok we need to remove the subscription
404+
// and close the connection if necessary.
405+
consumers.Remove(subscriptionId);
406+
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
407+
throw new CreateConsumerException($"Error while creating consumer for stream {config.Stream}, error: {e.Message}");
392408
}
393409

394-
if (response.ResponseCode == ResponseCode.Ok)
395-
return (subscriptionId, response);
396-
397-
// if the response code is not ok we need to remove the subscription
398-
// and close the connection if necessary.
399-
consumers.Remove(subscriptionId);
400-
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
401-
return (subscriptionId, response);
410+
return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError));
402411
}
403412

404413
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
@@ -715,7 +724,7 @@ private async ValueTask<bool> SendHeartBeat()
715724
private void InternalClose()
716725
{
717726
_heartBeatHandler.Close();
718-
IsClosed = true;
727+
// IsClosed = true;
719728
}
720729

721730
private bool HasEntities()
@@ -738,7 +747,6 @@ public async Task<CloseResponse> Close(string reason)
738747
return new CloseResponse(0, ResponseCode.Ok);
739748
}
740749

741-
InternalClose();
742750
try
743751
{
744752
var result =
@@ -761,6 +769,8 @@ public async Task<CloseResponse> Close(string reason)
761769
connection.Dispose();
762770
}
763771

772+
InternalClose();
773+
764774
return new CloseResponse(0, ResponseCode.Ok);
765775
}
766776

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ internal void Validate()
8787

8888
switch (ConsumerFilter)
8989
{
90-
case {PostFilter: null}:
90+
case { PostFilter: null }:
9191
throw new ArgumentException("PostFilter must be provided when Filter is set");
92-
case {Values.Count: 0}:
92+
case { Values.Count: 0 }:
9393
throw new ArgumentException("Values must be provided when Filter is set");
9494
}
9595
}
9696

97-
internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0};
97+
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
9898

9999
// it is needed to be able to add the subscriptions arguments
100100
// see consumerProperties["super-stream"] = SuperStream;
@@ -406,6 +406,8 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
406406
// we request the credit before process the check to keep the network busy
407407
try
408408
{
409+
if (Token.IsCancellationRequested)
410+
break;
409411
await _client.Credit(EntityId, 1).ConfigureAwait(false);
410412
}
411413
catch (InvalidOperationException)
@@ -504,43 +506,43 @@ private async Task Init()
504506
consumerProperties,
505507
async deliver =>
506508
{
507-
chunkConsumed++;
508-
// Send the chunk to the _chunksBuffer
509-
// in this way the chunks are processed in a separate thread
510-
// this wont' block the socket thread
511-
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
512-
if (Token.IsCancellationRequested)
509+
try
513510
{
514-
// the consumer is closing from the user but some chunks are still in the buffer
515-
// simply skip the chunk
516-
Logger?.LogTrace(
517-
"CancellationToken requested. The {EntityInfo} " +
518-
"The chunk won't be processed",
519-
DumpEntityConfiguration());
520-
return;
521-
}
511+
chunkConsumed++;
512+
// Send the chunk to the _chunksBuffer
513+
// in this way the chunks are processed in a separate thread
514+
// this wont' block the socket thread
515+
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
516+
if (Token.IsCancellationRequested)
517+
{
518+
// the consumer is closing from the user but some chunks are still in the buffer
519+
// simply skip the chunk
520+
Logger?.LogTrace(
521+
"CancellationToken requested. The {EntityInfo} " +
522+
"The chunk won't be processed",
523+
DumpEntityConfiguration());
524+
return;
525+
}
522526

523-
if (_config.Crc32 is not null)
524-
{
525-
var crcCalculated = BitConverter.ToUInt32(
526-
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
527-
);
528-
if (crcCalculated != deliver.Chunk.Crc)
527+
if (_config.Crc32 is not null)
529528
{
530-
Logger?.LogError(
531-
"CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " +
532-
"Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated,
533-
DumpEntityConfiguration(),
534-
chunkConsumed);
535-
536-
throw new CrcException(
537-
$"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
538-
$"Chunk Consumed {chunkConsumed}");
529+
var crcCalculated = BitConverter.ToUInt32(
530+
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
531+
);
532+
if (crcCalculated != deliver.Chunk.Crc)
533+
{
534+
Logger?.LogError(
535+
"CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " +
536+
"Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated,
537+
DumpEntityConfiguration(),
538+
chunkConsumed);
539+
540+
throw new CrcException(
541+
$"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
542+
$"Chunk Consumed {chunkConsumed}");
543+
}
539544
}
540-
}
541545

542-
try
543-
{
544546
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
545547
}
546548
catch (OperationCanceledException)
@@ -575,6 +577,7 @@ private async Task Init()
575577
return _config.StoredOffsetSpec;
576578
}
577579
).ConfigureAwait(false);
580+
578581
if (response.ResponseCode == ResponseCode.Ok)
579582
{
580583
_completeSubscription.SetResult();

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ public override async Task Close()
211211

212212
public override string ToString()
213213
{
214-
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream} ";
214+
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream}, " +
215+
$"client name: {_consumerConfig.ClientProvidedName} ";
215216
}
216217

217218
public ConsumerInfo Info { get; }

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// Copyright (c) 2007-2023 VMware, Inc.
44

55
using System;
6-
76
using System.Threading;
87
using System.Threading.Tasks;
98
using Microsoft.Extensions.Logging;
@@ -37,7 +36,7 @@ protected ReliableConfig(StreamSystem streamSystem, string stream)
3736
public abstract class ReliableBase
3837
{
3938
protected readonly SemaphoreSlim SemaphoreSlim = new(1);
40-
39+
private readonly object _lock = new();
4140
protected bool _isOpen;
4241
protected bool _inReconnection;
4342

@@ -61,8 +60,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
6160
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
6261
try
6362
{
64-
_isOpen = true;
6563
await CreateNewEntity(boot).ConfigureAwait(false);
64+
lock (_lock)
65+
{
66+
_isOpen = true;
67+
}
6668
}
6769

6870
catch (Exception e)
@@ -73,7 +75,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
7375
{
7476
// We consider the client as closed
7577
// since the exception is raised to the caller
76-
_isOpen = false;
78+
lock (_lock)
79+
{
80+
_isOpen = false;
81+
}
82+
7783
throw;
7884
}
7985
}
@@ -107,7 +113,7 @@ protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
107113
_inReconnection = true;
108114
try
109115
{
110-
switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && _isOpen)
116+
switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && IsOpen())
111117
{
112118
case true:
113119
BaseLogger.LogInformation("{Identity} is disconnected. Client will try reconnect", ToString());
@@ -195,6 +201,9 @@ private void LogException(Exception exception)
195201

196202
public bool IsOpen()
197203
{
198-
return _isOpen;
204+
lock (_lock)
205+
{
206+
return _isOpen;
207+
}
199208
}
200209
}

RabbitMQ.Stream.Client/Subscribe.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public int Write(Span<byte> span)
134134
private readonly uint correlationId;
135135
private readonly ResponseCode responseCode;
136136

137-
private SubscribeResponse(uint correlationId, ResponseCode responseCode)
137+
internal SubscribeResponse(uint correlationId, ResponseCode responseCode)
138138
{
139139
this.correlationId = correlationId;
140140
this.responseCode = responseCode;

0 commit comments

Comments
 (0)