Skip to content

Commit 15fe9dc

Browse files
committed
Fix race condition during rawconsumer creation
The processCheck now waits the subscription is completed before send the credits Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 0833ec2 commit 15fe9dc

File tree

4 files changed

+49
-20
lines changed

4 files changed

+49
-20
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -713,23 +713,27 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
713713
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
714714
try
715715
{
716-
if (!string.IsNullOrEmpty(ClientId))
717-
{
718-
_logger.LogInformation("Releasing ids for the client id {ClientId}", ClientId);
719-
pool.Release(ClientId, stream);
720-
}
721-
722716
if (!HasEntities())
723717
{
724718
if (!string.IsNullOrEmpty(ClientId))
725719
{
726720
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
727-
// pool.remove(ClientId) is a duplicate call here but it is ok
728721
// the client can be closed in an unexpected way so we need to remove it from the pool
729722
// so you will find pool.remove(ClientId) also to the disconnect event
723+
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
724+
pool.Remove(ClientId);
730725
await Close(reason).ConfigureAwait(false);
731726
}
732727
}
728+
else
729+
{
730+
// we remove an id reference from the client
731+
// in case there are still active ids from the client and the stream
732+
if (!string.IsNullOrEmpty(ClientId))
733+
{
734+
pool.Release(ClientId, stream);
735+
}
736+
}
733737

734738
var result = new CloseResponse(0, ResponseCode.Ok);
735739
return result;

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ private async Task ProcessIncomingFrames()
145145
var buffer = result.Buffer;
146146
if (buffer.Length == 0)
147147
{
148-
Debug.WriteLine("TCP Connection Closed");
148+
Debug.WriteLine("TCP Connection Closed!");
149149
// We're not going to receive any more bytes from the connection.
150150
break;
151151
}
152152

153153
// Let's try to read some frames!
154154

155-
while (TryReadFrame(ref buffer, out var frame))
155+
while (TryReadFrame(ref buffer, out var frame) && !isClosed)
156156
{
157157
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.
158158

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,25 @@ internal static byte FindNextValidId(List<byte> ids)
9696
return 0;
9797
}
9898

99-
// we start with the recycle when we reach the max value
100-
// in this way we can avoid to recycle the same ids in a short time
99+
// // we start with the recycle when we reach the max value
100+
// // in this way we can avoid to recycle the same ids in a short time
101101
ids.Sort();
102-
if (ids[^1] == byte.MaxValue)
102+
103+
/* Unmerged change from project 'RabbitMQ.Stream.Client(net7.0)'
104+
Before:
105+
if (ids[^1] != byte.MaxValue) return (byte)(ids[^1] + 1);
106+
After:
107+
if (ids[^1] != byte.MaxValue)
108+
return (byte)(ids[^1] + 1);
109+
*/
110+
if (ids[^1] != byte.MaxValue)
111+
return (byte)(ids[^1] + 1);
112+
113+
for (var i = 0; i < ids.Count - 1; i++)
103114
{
104-
for (var i = 0; i < ids.Count - 1; i++)
115+
if (ids[i + 1] - ids[i] > 1)
105116
{
106-
if (ids[i + 1] - ids[i] > 1)
107-
{
108-
return (byte)(ids[i] + 1);
109-
}
117+
return (byte)(ids[i] + 1);
110118
}
111119
}
112120

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
120120
private readonly ILogger _logger;
121121
private readonly Channel<Chunk> _chunksBuffer;
122122
private readonly ushort _initialCredits;
123+
124+
// _completeSubscription is used to notify the ProcessChunks task
125+
// that the subscription is completed and so it can start to process the chunks
126+
// this is needed because the socket starts to receive the chunks before the subscription_id is
127+
// assigned.
128+
private readonly TaskCompletionSource _completeSubscription = new ();
123129

124130
private string ConsumerInfo()
125131
{
@@ -192,6 +198,7 @@ public static async Task<IConsumer> Create(
192198
var client = await RoutingHelper<Routing>
193199
.LookupRandomConnection(clientParameters, metaStreamInfo, config.Pool, logger)
194200
.ConfigureAwait(false);
201+
logger?.LogInformation("Creating consumer {ConsumerInfo}", client.ClientId);
195202
var consumer = new RawConsumer((Client)client, config, logger);
196203
await consumer.Init().ConfigureAwait(false);
197204
return consumer;
@@ -304,6 +311,14 @@ await _config.MessageHandler(this,
304311
}
305312
catch (Exception e)
306313
{
314+
if (Token.IsCancellationRequested)
315+
{
316+
_logger?.LogDebug(
317+
"Dispatching {ConsumerInfo}, Cancellation Requested, the consumer is closing. ",
318+
ConsumerInfo());
319+
return;
320+
}
321+
307322
_logger?.LogError(e,
308323
"Error while Dispatching message, ChunkId : {ChunkId} {ConsumerInfo}",
309324
chunk.ChunkId, ConsumerInfo());
@@ -373,6 +388,7 @@ private void ProcessChunks()
373388
{
374389
Task.Run(async () =>
375390
{
391+
await _completeSubscription.Task.ConfigureAwait(false);
376392
try
377393
{
378394
while (!Token.IsCancellationRequested &&
@@ -384,6 +400,7 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
384400
// we request the credit before process the check to keep the network busy
385401
try
386402
{
403+
387404
await _client.Credit(_subscriberId, 1).ConfigureAwait(false);
388405
}
389406
catch (InvalidOperationException)
@@ -488,7 +505,7 @@ private async Task Init()
488505
var chunkConsumed = 0;
489506
// this the default value for the consumer.
490507
_config.StoredOffsetSpec = _config.OffsetSpec;
491-
var (consumerId, response) = await _client.Subscribe(
508+
(_subscriberId, var response) = await _client.Subscribe(
492509
_config,
493510
_initialCredits,
494511
consumerProperties,
@@ -553,7 +570,7 @@ private async Task Init()
553570
).ConfigureAwait(false);
554571
if (response.ResponseCode == ResponseCode.Ok)
555572
{
556-
_subscriberId = consumerId;
573+
_completeSubscription.SetResult();
557574
_status = EntityStatus.Open;
558575
return;
559576
}
@@ -567,12 +584,12 @@ public override async Task<ResponseCode> Close()
567584
// this unlock the consumer if it is waiting for a message
568585
// see DispatchMessage method where the token is used
569586
MaybeCancelToken();
570-
571587
if (!IsOpen())
572588
{
573589
return ResponseCode.Ok;
574590
}
575591

592+
_status = EntityStatus.Closed;
576593
var result = ResponseCode.Ok;
577594
try
578595
{

0 commit comments

Comments
 (0)