Skip to content

Commit 0711e13

Browse files
authored
Fix the Unsubscribe timeout problem (#313)
- The Unsubscribe function sync (like the other commands ). The consumers' list is updated in the right way. The connection is correctly closed since there are no pending consumers. - Fix the handle delivery if the consumer is removed, but there are still chunks on the wire. Add debug logs for this situation. - Add Degug Asserts to validate the buffer size, which can help understand some parse chunk errors. That can be temporary at some point; we can remove them. - Add the cancellation token to the connection class it helps when the reader is blocked and the consumer is closed. - Given this [use case](#310) so short-life consumers, the initial credits should be low to avoid keeping the read handler busy. - Minor change is replace Memory<T> to ReadOnlyMemory<T> it is a bit faster and safe - Uniform the log debug and error messages with consumer info it adds all the consumer info that can be helpful in case of debugging and errors - Fixes #310 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent a2e06bf commit 0711e13

File tree

7 files changed

+163
-62
lines changed

7 files changed

+163
-62
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -362,13 +362,19 @@ public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
362362
{
363363
try
364364
{
365+
// here we reduce a bit the timeout to avoid waiting too much
366+
// if the client is busy with read operations it can take time to process the unsubscribe
367+
// but the subscribe is removed.
365368
var result =
366369
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
367-
new UnsubscribeRequest(corr, subscriptionId)).ConfigureAwait(false);
370+
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
371+
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);
372+
368373
return result;
369374
}
370375
finally
371376
{
377+
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
372378
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
373379
consumers.Remove(subscriptionId);
374380
}
@@ -436,7 +442,6 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
436442
// so there is no need to send the heartbeat when not necessary
437443
_heartBeatHandler.UpdateHeartBeat();
438444

439-
ConsumerEvents consumerEvents;
440445
switch (tag)
441446
{
442447
case PublishConfirm.Key:
@@ -446,14 +451,29 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
446451
confirmCallback(confirm.PublishingIds);
447452
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
448453
{
449-
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
454+
if (confirmSegment.Array != null)
455+
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
450456
}
451457

452458
break;
453459
case Deliver.Key:
454460
Deliver.Read(frame, out var deliver);
455-
consumerEvents = consumers[deliver.SubscriptionId];
456-
await consumerEvents.DeliverHandler(deliver).ConfigureAwait(false);
461+
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
462+
{
463+
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
464+
}
465+
else
466+
{
467+
// the consumer is not found, this can happen when the consumer is closing
468+
// and there are still chunks on the wire to the handler is still processing the chunks
469+
// we can ignore the chunk since the subscription does not exists anymore
470+
_logger?.LogDebug(
471+
"Could not find stream subscription {ID} or subscription closing." +
472+
"A possible cause it that the subscription was closed and the are still chunks on the wire. " +
473+
"Reduce the initial credits can help to avoid this situation",
474+
deliver.SubscriptionId);
475+
}
476+
457477
break;
458478
case PublishError.Key:
459479
PublishError.Read(frame, out var error);
@@ -588,7 +608,8 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
588608
default:
589609
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
590610
{
591-
ArrayPool<byte>.Shared.Return(segment.Array);
611+
if (segment.Array != null)
612+
ArrayPool<byte>.Shared.Return(segment.Array);
592613
}
593614

594615
throw new ArgumentException($"Unknown or unexpected tag: {tag}", nameof(tag));
@@ -637,19 +658,23 @@ public async Task<CloseResponse> Close(string reason)
637658
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
638659
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
639660

640-
InternalClose();
641-
connection.Dispose();
642661
return result;
643662
}
644-
645-
catch (System.TimeoutException)
663+
catch (TimeoutException)
646664
{
647-
_logger.LogError("Timeout while closing the connection. The connection will be closed anyway");
665+
_logger.LogError(
666+
"Timeout while closing the connection. The connection will be closed anyway");
648667
}
649668
catch (Exception e)
650669
{
651670
_logger.LogError(e, "An error occurred while calling {CalledFunction}", nameof(connection.Dispose));
652671
}
672+
finally
673+
{
674+
// even if the close fails we need to close the connection
675+
InternalClose();
676+
connection.Dispose();
677+
}
653678

654679
return new CloseResponse(0, ResponseCode.Ok);
655680
}

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class Connection : IDisposable
2828
private bool _disposedValue;
2929
private readonly ILogger _logger;
3030

31+
// this is used to cancel the socket and the reader/write operations tasks
32+
private readonly CancellationTokenSource _cancelTokenSource = new();
33+
private CancellationToken Token => _cancelTokenSource.Token;
34+
3135
internal int NumFrames => numFrames;
3236

3337
public bool IsClosed => isClosed;
@@ -98,21 +102,27 @@ public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
98102

99103
private async Task WriteCommand<T>(T command) where T : struct, ICommand
100104
{
105+
if (Token.IsCancellationRequested)
106+
{
107+
throw new OperationCanceledException("Token Cancellation Requested Connection");
108+
}
109+
101110
if (isClosed)
102111
{
103112
throw new InvalidOperationException("Connection is closed");
104113
}
114+
105115
// Only one thread should be able to write to the output pipeline at a time.
106-
await _writeLock.WaitAsync().ConfigureAwait(false);
116+
await _writeLock.WaitAsync(Token).ConfigureAwait(false);
107117
try
108118
{
109119
var size = command.SizeNeeded;
110120
var mem = new byte[4 + size]; // + 4 to write the size
111121
WireFormatting.WriteUInt32(mem, (uint)size);
112122
var written = command.Write(mem.AsSpan()[4..]);
113-
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem)).ConfigureAwait(false);
123+
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem), Token).ConfigureAwait(false);
114124
Debug.Assert(size == written);
115-
await writer.FlushAsync().ConfigureAwait(false);
125+
await writer.FlushAsync(Token).ConfigureAwait(false);
116126
}
117127
finally
118128
{
@@ -129,7 +139,7 @@ private async Task ProcessIncomingFrames()
129139
{
130140
if (!reader.TryRead(out var result))
131141
{
132-
result = await reader.ReadAsync().ConfigureAwait(false);
142+
result = await reader.ReadAsync(Token).ConfigureAwait(false);
133143
}
134144

135145
var buffer = result.Buffer;
@@ -219,13 +229,19 @@ public void Dispose()
219229
{
220230
try
221231
{
232+
if (!_cancelTokenSource.IsCancellationRequested)
233+
{
234+
_cancelTokenSource.Cancel();
235+
}
236+
222237
isClosed = true;
223238
writer.Complete();
224239
reader.Complete();
225-
socket.Dispose();
240+
socket.Close();
226241
if (!_incomingFramesTask.Wait(Consts.MidWait))
227242
{
228-
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}", Consts.MidWait);
243+
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}",
244+
Consts.MidWait);
229245
}
230246
}
231247
finally

RabbitMQ.Stream.Client/Deliver.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal readonly struct SubEntryChunk
5151
private SubEntryChunk(byte compress,
5252
ushort numRecordsInBatch,
5353
uint unCompressedDataSize, uint dataLen,
54-
Memory<byte> data)
54+
ReadOnlyMemory<byte> data)
5555
{
5656
compressValue = compress;
5757
NumRecordsInBatch = numRecordsInBatch;
@@ -67,7 +67,7 @@ private SubEntryChunk(byte compress,
6767
public uint UnCompressedDataSize { get; }
6868

6969
public uint DataLen { get; }
70-
public Memory<byte> Data { get; }
70+
public ReadOnlyMemory<byte> Data { get; }
7171

7272
// This wrapper was added to be used in async methods
7373
// where the SequenceReader is not available
@@ -122,7 +122,7 @@ private Chunk(byte magicVersion,
122122
ulong epoch,
123123
ulong chunkId,
124124
uint crc,
125-
Memory<byte> data)
125+
ReadOnlyMemory<byte> data)
126126
{
127127
MagicVersion = magicVersion;
128128
NumEntries = numEntries;
@@ -142,7 +142,7 @@ private Chunk(byte magicVersion,
142142
public ulong Epoch { get; }
143143
public ulong ChunkId { get; }
144144
public uint Crc { get; }
145-
public Memory<byte> Data { get; }
145+
public ReadOnlyMemory<byte> Data { get; }
146146

147147
internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
148148
{
@@ -175,6 +175,11 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
175175
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
176176
}
177177

178+
if (memory.Length != dataLen)
179+
{
180+
throw new Exception($"Chunk: Not enough data, memoryLen: {memory.Length}, dataLen: {dataLen}");
181+
}
182+
178183
chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
179184
return offset;
180185
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMec
88
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
99
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
1010
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
11-
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
11+
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
1212
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
1313
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
1414
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>

0 commit comments

Comments
 (0)