Skip to content

Improve Reconnection #343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace RabbitMQ.Stream.Client
public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
public Func<MetaDataUpdate, Task> MetadataHandler { get; set; }
}

internal enum EntityStatus
Expand Down Expand Up @@ -49,10 +50,11 @@ protected void ThrowIfClosed()
// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
private void MaybeCancelToken()
protected void UpdateStatusToClosed()
{
if (!_cancelTokenSource.IsCancellationRequested)
_cancelTokenSource.Cancel();
_status = EntityStatus.Closed;
}

public abstract Task<ResponseCode> Close();
Expand Down Expand Up @@ -82,18 +84,15 @@ protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool igno
return ResponseCode.Ok;
}

MaybeCancelToken();

_status = EntityStatus.Closed;
UpdateStatusToClosed();
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);

if (_client is { IsClosed: true })
{
return result;
}

var closed = await _client.MaybeClose($"closing: {EntityId}",
GetStream(), config.Pool)
var closed = await _client.MaybeClose($"closing: {EntityId}", config.Pool)
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
Expand Down
89 changes: 40 additions & 49 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public record ClientParameters
{"product", "RabbitMQ Stream"},
{"version", Version.VersionString},
{"platform", ".NET"},
{"copyright", "Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."},
{
"copyright",
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
},
{
"information",
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
Expand All @@ -53,7 +56,7 @@ public record ClientParameters
public string VirtualHost { get; set; } = "/";
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);

public delegate void MetadataUpdateHandler(MetaDataUpdate update);
public delegate Task MetadataUpdateHandler(MetaDataUpdate update);

public event MetadataUpdateHandler OnMetadataUpdate;
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
Expand Down Expand Up @@ -121,12 +124,13 @@ public class Client : IClient
private readonly TaskCompletionSource<TuneResponse> tuneReceived =
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);

internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
internal readonly IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))>
publishers =
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
new ConcurrentDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)
)>();

internal readonly IDictionary<byte, ConsumerEvents> consumers =
new ConcurrentDictionary<byte, ConsumerEvents>();
internal readonly IDictionary<byte, (string, ConsumerEvents)> consumers =
new ConcurrentDictionary<byte, (string, ConsumerEvents)>();

private int publishCommandsSent;

Expand Down Expand Up @@ -312,7 +316,8 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand

try
{
publishers.Add(publisherId, (confirmCallback, errorCallback));
publishers.Add(publisherId, (stream,
(confirmCallback, errorCallback)));
response = await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false);
}
Expand All @@ -324,10 +329,9 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
if (response.ResponseCode == ResponseCode.Ok)
return (publisherId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
publishers.Remove(publisherId);
await MaybeClose("Create Publisher Exception", stream, pool).ConfigureAwait(false);
pool?.MaybeClose(ClientId, "Publisher creation failed");
return (publisherId, response);
}

Expand Down Expand Up @@ -396,9 +400,10 @@ private byte IncrementEntityId()
try
{
consumers.Add(subscriptionId,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler));
(config.Stream,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler)));

response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
Expand All @@ -412,10 +417,8 @@ private byte IncrementEntityId()
if (response.ResponseCode == ResponseCode.Ok)
return (subscriptionId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
consumers.Remove(subscriptionId);
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
config.Pool.MaybeClose(ClientId, "Subscription failed");
return (subscriptionId, response);
}

Expand Down Expand Up @@ -518,7 +521,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
confirmFrames += 1;
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
{
var (confirmCallback, _) = publisherConf;
var (_, (confirmCallback, _)) = (publisherConf);

confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
Expand All @@ -542,7 +546,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
Deliver.Read(frame, out var deliver);
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
{
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
var (_, deliverHandler) = consumerEvent;
await deliverHandler.DeliverHandler(deliver).ConfigureAwait(false);
}
else
{
Expand All @@ -561,7 +566,7 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
PublishError.Read(frame, out var error);
if (publishers.TryGetValue(error.PublisherId, out var publisher))
{
var (_, errorCallback) = publisher;
var (_, (_, errorCallback)) = publisher;
errorCallback(error.PublishingErrors);
}
else
Expand All @@ -588,7 +593,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
ConsumerUpdateQueryResponse.Read(frame, out var consumerUpdateQueryResponse);
HandleCorrelatedResponse(consumerUpdateQueryResponse);
var consumerEventsUpd = consumers[consumerUpdateQueryResponse.SubscriptionId];
var off = await consumerEventsUpd.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
var consumer = consumerEventsUpd.Item2;
var off = await consumer.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
.ConfigureAwait(false);
if (off == null)
{
Expand Down Expand Up @@ -736,14 +742,6 @@ private void InternalClose()
IsClosed = true;
}

private bool HasEntities()
{
lock (Obj)
{
return publishers.Count > 0 || consumers.Count > 0;
}
}

private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
{
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
Expand All @@ -759,6 +757,7 @@ public async Task<CloseResponse> Close(string reason)
InternalClose();
try
{
connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
Expand Down Expand Up @@ -794,32 +793,14 @@ public async Task<CloseResponse> Close(string reason)
// Release will decrement the active ids for the connection
// if the active ids are 0 the connection will be closed

internal async Task<CloseResponse> MaybeClose(string reason, string stream, ConnectionsPool pool)
internal async Task<CloseResponse> MaybeClose(string reason, ConnectionsPool pool)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
if (!HasEntities())
{
if (!string.IsNullOrEmpty(ClientId))
{
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
pool.Remove(ClientId);
await Close(reason).ConfigureAwait(false);
}
}
else
{
// we remove an id reference from the client
// in case there are still active ids from the client and the stream
if (!string.IsNullOrEmpty(ClientId))
{
pool.Release(ClientId, stream);
}
}

// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
pool.MaybeClose(ClientId, reason);
var result = new CloseResponse(0, ResponseCode.Ok);
return result;
}
Expand All @@ -831,6 +812,16 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn

public string ClientId { get; init; }

public IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers
{
get => publishers;
}

public IDictionary<byte, (string, ConsumerEvents)> Consumers
{
get => consumers;
}

public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
{
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>
Expand Down
13 changes: 10 additions & 3 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace RabbitMQ.Stream.Client
{
internal static class ClientExceptions
{

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
Expand All @@ -32,11 +31,19 @@ internal static bool IsAKnownException(Exception exception)
{
var x = aggregateException.InnerExceptions.Select(x =>
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException));
x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException));
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException);
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) ||
IsStreamNotAvailable(exception);
}

internal static bool IsStreamNotAvailable(Exception exception)
{
// StreamNotAvailable is a temporary exception it can happen when the stream is just created and
// it is not ready yet to all the nodes. In this case we can try to reconnect.
return exception is CreateException { ResponseCode: ResponseCode.StreamNotAvailable };
}

public static void MaybeThrowException(ResponseCode responseCode, string message)
Expand Down
22 changes: 16 additions & 6 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

namespace RabbitMQ.Stream.Client
{
internal static class ConnectionClosedReason
{
public const string Normal = "TCP connection closed normal";
public const string Unexpected = "TCP connection closed unexpected";
}

public class Connection : IDisposable
{
private readonly Socket socket;
Expand All @@ -25,6 +31,7 @@ public class Connection : IDisposable
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
private int numFrames;
private bool isClosed = false;
private string _closedReason = ConnectionClosedReason.Unexpected;
private bool _disposedValue;
private readonly ILogger _logger;

Expand All @@ -35,6 +42,10 @@ public class Connection : IDisposable
internal int NumFrames => numFrames;
internal string ClientId { get; set; }
public bool IsClosed => isClosed;
public void UpdateCloseStatus(string reason)
{
_closedReason = reason;
}

private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
{
Expand Down Expand Up @@ -191,14 +202,12 @@ private async Task ProcessIncomingFrames()
finally
{
isClosed = true;
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
ClientId, Token.IsCancellationRequested);
_logger?.LogDebug(
"TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ",
ClientId, _closedReason, Token.IsCancellationRequested);
// Mark the PipeReader as complete
await reader.CompleteAsync(caught).ConfigureAwait(false);
var t = closedCallback?.Invoke("TCP Connection Closed")!;
if (t != null)
await t.ConfigureAwait(false);
_logger?.LogDebug("TCP Connection Closed");
closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -231,6 +240,7 @@ public void Dispose()
{
try
{
UpdateCloseStatus(ConnectionClosedReason.Normal);
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
Expand Down
Loading