Skip to content

Fix the Unsubscribe timeout problem #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,19 @@ public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
{
try
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId)).ConfigureAwait(false);
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
}
finally
{
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
consumers.Remove(subscriptionId);
}
Expand Down Expand Up @@ -436,7 +442,6 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
// so there is no need to send the heartbeat when not necessary
_heartBeatHandler.UpdateHeartBeat();

ConsumerEvents consumerEvents;
switch (tag)
{
case PublishConfirm.Key:
Expand All @@ -446,14 +451,29 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
}

break;
case Deliver.Key:
Deliver.Read(frame, out var deliver);
consumerEvents = consumers[deliver.SubscriptionId];
await consumerEvents.DeliverHandler(deliver).ConfigureAwait(false);
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
{
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
}
else
{
// the consumer is not found, this can happen when the consumer is closing
// and there are still chunks on the wire to the handler is still processing the chunks
// we can ignore the chunk since the subscription does not exists anymore
_logger?.LogDebug(
"Could not find stream subscription {ID} or subscription closing." +
"A possible cause it that the subscription was closed and the are still chunks on the wire. " +
"Reduce the initial credits can help to avoid this situation",
deliver.SubscriptionId);
}

break;
case PublishError.Key:
PublishError.Read(frame, out var error);
Expand Down Expand Up @@ -588,7 +608,8 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
default:
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
if (segment.Array != null)
ArrayPool<byte>.Shared.Return(segment.Array);
}

throw new ArgumentException($"Unknown or unexpected tag: {tag}", nameof(tag));
Expand Down Expand Up @@ -637,19 +658,23 @@ public async Task<CloseResponse> Close(string reason)
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);

InternalClose();
connection.Dispose();
return result;
}

catch (System.TimeoutException)
catch (TimeoutException)
{
_logger.LogError("Timeout while closing the connection. The connection will be closed anyway");
_logger.LogError(
"Timeout while closing the connection. The connection will be closed anyway");
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while calling {CalledFunction}", nameof(connection.Dispose));
}
finally
{
// even if the close fails we need to close the connection
InternalClose();
connection.Dispose();
}

return new CloseResponse(0, ResponseCode.Ok);
}
Expand Down
28 changes: 22 additions & 6 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class Connection : IDisposable
private bool _disposedValue;
private readonly ILogger _logger;

// this is used to cancel the socket and the reader/write operations tasks
private readonly CancellationTokenSource _cancelTokenSource = new();
private CancellationToken Token => _cancelTokenSource.Token;

internal int NumFrames => numFrames;

public bool IsClosed => isClosed;
Expand Down Expand Up @@ -98,21 +102,27 @@ public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand

private async Task WriteCommand<T>(T command) where T : struct, ICommand
{
if (Token.IsCancellationRequested)
{
throw new OperationCanceledException("Token Cancellation Requested Connection");
}

if (isClosed)
{
throw new InvalidOperationException("Connection is closed");
}

// Only one thread should be able to write to the output pipeline at a time.
await _writeLock.WaitAsync().ConfigureAwait(false);
await _writeLock.WaitAsync(Token).ConfigureAwait(false);
try
{
var size = command.SizeNeeded;
var mem = new byte[4 + size]; // + 4 to write the size
WireFormatting.WriteUInt32(mem, (uint)size);
var written = command.Write(mem.AsSpan()[4..]);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem)).ConfigureAwait(false);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem), Token).ConfigureAwait(false);
Debug.Assert(size == written);
await writer.FlushAsync().ConfigureAwait(false);
await writer.FlushAsync(Token).ConfigureAwait(false);
}
finally
{
Expand All @@ -129,7 +139,7 @@ private async Task ProcessIncomingFrames()
{
if (!reader.TryRead(out var result))
{
result = await reader.ReadAsync().ConfigureAwait(false);
result = await reader.ReadAsync(Token).ConfigureAwait(false);
}

var buffer = result.Buffer;
Expand Down Expand Up @@ -219,13 +229,19 @@ public void Dispose()
{
try
{
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
}

isClosed = true;
writer.Complete();
reader.Complete();
socket.Dispose();
socket.Close();
if (!_incomingFramesTask.Wait(Consts.MidWait))
{
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}", Consts.MidWait);
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}",
Consts.MidWait);
}
}
finally
Expand Down
13 changes: 9 additions & 4 deletions RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ internal readonly struct SubEntryChunk
private SubEntryChunk(byte compress,
ushort numRecordsInBatch,
uint unCompressedDataSize, uint dataLen,
Memory<byte> data)
ReadOnlyMemory<byte> data)
{
compressValue = compress;
NumRecordsInBatch = numRecordsInBatch;
Expand All @@ -67,7 +67,7 @@ private SubEntryChunk(byte compress,
public uint UnCompressedDataSize { get; }

public uint DataLen { get; }
public Memory<byte> Data { get; }
public ReadOnlyMemory<byte> Data { get; }

// This wrapper was added to be used in async methods
// where the SequenceReader is not available
Expand Down Expand Up @@ -122,7 +122,7 @@ private Chunk(byte magicVersion,
ulong epoch,
ulong chunkId,
uint crc,
Memory<byte> data)
ReadOnlyMemory<byte> data)
{
MagicVersion = magicVersion;
NumEntries = numEntries;
Expand All @@ -142,7 +142,7 @@ private Chunk(byte magicVersion,
public ulong Epoch { get; }
public ulong ChunkId { get; }
public uint Crc { get; }
public Memory<byte> Data { get; }
public ReadOnlyMemory<byte> Data { get; }

internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
{
Expand Down Expand Up @@ -175,6 +175,11 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

if (memory.Length != dataLen)
{
throw new Exception($"Chunk: Not enough data, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
return offset;
}
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMec
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
Expand Down
Loading