Skip to content

Commit 7fa33dd

Browse files
authored
Improve Reconnection (#343)
* Improve Reconnection * Part of #336 * Improve the disconnection message. It is possible to understand if it is a normal disconnection or an unexpected. * Remove the Active Items from the connection pool and use the Publishers and Consumers client list directly to check the pool size * Refactor the Factory Classes. Remove code duplication in case of metadata update and connection closed. See ReliableBase.OnEntityClosed * Handle streamNotAvailable error. In this case the client will try to reconnect the entity * Fix the events attach to the RawConsumer and RawProducer. The events are attached only if the ResponseCode is OK * Detach the events once the entity receives the disconnection or metadata update. In that case, the entity is closed * Introduce ReliableEntityStatus like a state machine to understand the status of Producer and Consumer classes * Add ResourceAvailableReconnectStrategy interface to Handle the retry in case testing in a stream exists. See ReliableBase CheckIfStreamIsAvailable * Change the MetadataHandler to Func<MetaDataUpdate, Task> to be like the other methods * Producer and Consumer classes fail fast during the first initialization. The user is aware of what is happening. The reconnect part occurs only after the first boot. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent a7c9fd6 commit 7fa33dd

30 files changed

+702
-673
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace RabbitMQ.Stream.Client
1212
public abstract record EntityCommonConfig
1313
{
1414
internal ConnectionsPool Pool { get; set; }
15+
public Func<MetaDataUpdate, Task> MetadataHandler { get; set; }
1516
}
1617

1718
internal enum EntityStatus
@@ -49,10 +50,11 @@ protected void ThrowIfClosed()
4950
// here the _cancelTokenSource is disposed and the token is cancelled
5051
// in producer is used to cancel the send task
5152
// in consumer is used to cancel the receive task
52-
private void MaybeCancelToken()
53+
protected void UpdateStatusToClosed()
5354
{
5455
if (!_cancelTokenSource.IsCancellationRequested)
5556
_cancelTokenSource.Cancel();
57+
_status = EntityStatus.Closed;
5658
}
5759

5860
public abstract Task<ResponseCode> Close();
@@ -82,18 +84,15 @@ protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool igno
8284
return ResponseCode.Ok;
8385
}
8486

85-
MaybeCancelToken();
86-
87-
_status = EntityStatus.Closed;
87+
UpdateStatusToClosed();
8888
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
8989

9090
if (_client is { IsClosed: true })
9191
{
9292
return result;
9393
}
9494

95-
var closed = await _client.MaybeClose($"closing: {EntityId}",
96-
GetStream(), config.Pool)
95+
var closed = await _client.MaybeClose($"closing: {EntityId}", config.Pool)
9796
.ConfigureAwait(false);
9897
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
9998
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());

RabbitMQ.Stream.Client/Client.cs

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ public record ClientParameters
4040
{"product", "RabbitMQ Stream"},
4141
{"version", Version.VersionString},
4242
{"platform", ".NET"},
43-
{"copyright", "Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."},
43+
{
44+
"copyright",
45+
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
46+
},
4447
{
4548
"information",
4649
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
@@ -53,7 +56,7 @@ public record ClientParameters
5356
public string VirtualHost { get; set; } = "/";
5457
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
5558

56-
public delegate void MetadataUpdateHandler(MetaDataUpdate update);
59+
public delegate Task MetadataUpdateHandler(MetaDataUpdate update);
5760

5861
public event MetadataUpdateHandler OnMetadataUpdate;
5962
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
@@ -121,12 +124,13 @@ public class Client : IClient
121124
private readonly TaskCompletionSource<TuneResponse> tuneReceived =
122125
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
123126

124-
internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
127+
internal readonly IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))>
125128
publishers =
126-
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
129+
new ConcurrentDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)
130+
)>();
127131

128-
internal readonly IDictionary<byte, ConsumerEvents> consumers =
129-
new ConcurrentDictionary<byte, ConsumerEvents>();
132+
internal readonly IDictionary<byte, (string, ConsumerEvents)> consumers =
133+
new ConcurrentDictionary<byte, (string, ConsumerEvents)>();
130134

131135
private int publishCommandsSent;
132136

@@ -312,7 +316,8 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
312316

313317
try
314318
{
315-
publishers.Add(publisherId, (confirmCallback, errorCallback));
319+
publishers.Add(publisherId, (stream,
320+
(confirmCallback, errorCallback)));
316321
response = await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
317322
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false);
318323
}
@@ -324,10 +329,9 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
324329
if (response.ResponseCode == ResponseCode.Ok)
325330
return (publisherId, response);
326331

327-
// if the response code is not ok we need to remove the subscription
328332
// and close the connection if necessary.
329333
publishers.Remove(publisherId);
330-
await MaybeClose("Create Publisher Exception", stream, pool).ConfigureAwait(false);
334+
pool?.MaybeClose(ClientId, "Publisher creation failed");
331335
return (publisherId, response);
332336
}
333337

@@ -396,9 +400,10 @@ private byte IncrementEntityId()
396400
try
397401
{
398402
consumers.Add(subscriptionId,
399-
new ConsumerEvents(
400-
deliverHandler,
401-
consumerUpdateHandler));
403+
(config.Stream,
404+
new ConsumerEvents(
405+
deliverHandler,
406+
consumerUpdateHandler)));
402407

403408
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
404409
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
@@ -412,10 +417,8 @@ private byte IncrementEntityId()
412417
if (response.ResponseCode == ResponseCode.Ok)
413418
return (subscriptionId, response);
414419

415-
// if the response code is not ok we need to remove the subscription
416-
// and close the connection if necessary.
417420
consumers.Remove(subscriptionId);
418-
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
421+
config.Pool.MaybeClose(ClientId, "Subscription failed");
419422
return (subscriptionId, response);
420423
}
421424

@@ -518,7 +521,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
518521
confirmFrames += 1;
519522
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
520523
{
521-
var (confirmCallback, _) = publisherConf;
524+
var (_, (confirmCallback, _)) = (publisherConf);
525+
522526
confirmCallback(confirm.PublishingIds);
523527
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
524528
{
@@ -542,7 +546,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
542546
Deliver.Read(frame, out var deliver);
543547
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
544548
{
545-
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
549+
var (_, deliverHandler) = consumerEvent;
550+
await deliverHandler.DeliverHandler(deliver).ConfigureAwait(false);
546551
}
547552
else
548553
{
@@ -561,7 +566,7 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
561566
PublishError.Read(frame, out var error);
562567
if (publishers.TryGetValue(error.PublisherId, out var publisher))
563568
{
564-
var (_, errorCallback) = publisher;
569+
var (_, (_, errorCallback)) = publisher;
565570
errorCallback(error.PublishingErrors);
566571
}
567572
else
@@ -588,7 +593,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
588593
ConsumerUpdateQueryResponse.Read(frame, out var consumerUpdateQueryResponse);
589594
HandleCorrelatedResponse(consumerUpdateQueryResponse);
590595
var consumerEventsUpd = consumers[consumerUpdateQueryResponse.SubscriptionId];
591-
var off = await consumerEventsUpd.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
596+
var consumer = consumerEventsUpd.Item2;
597+
var off = await consumer.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
592598
.ConfigureAwait(false);
593599
if (off == null)
594600
{
@@ -736,14 +742,6 @@ private void InternalClose()
736742
IsClosed = true;
737743
}
738744

739-
private bool HasEntities()
740-
{
741-
lock (Obj)
742-
{
743-
return publishers.Count > 0 || consumers.Count > 0;
744-
}
745-
}
746-
747745
private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
748746
{
749747
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
@@ -759,6 +757,7 @@ public async Task<CloseResponse> Close(string reason)
759757
InternalClose();
760758
try
761759
{
760+
connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
762761
var result =
763762
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
764763
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
@@ -794,32 +793,14 @@ public async Task<CloseResponse> Close(string reason)
794793
// Release will decrement the active ids for the connection
795794
// if the active ids are 0 the connection will be closed
796795

797-
internal async Task<CloseResponse> MaybeClose(string reason, string stream, ConnectionsPool pool)
796+
internal async Task<CloseResponse> MaybeClose(string reason, ConnectionsPool pool)
798797
{
799798
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
800799
try
801800
{
802-
if (!HasEntities())
803-
{
804-
if (!string.IsNullOrEmpty(ClientId))
805-
{
806-
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
807-
// the client can be closed in an unexpected way so we need to remove it from the pool
808-
// so you will find pool.remove(ClientId) also to the disconnect event
809-
pool.Remove(ClientId);
810-
await Close(reason).ConfigureAwait(false);
811-
}
812-
}
813-
else
814-
{
815-
// we remove an id reference from the client
816-
// in case there are still active ids from the client and the stream
817-
if (!string.IsNullOrEmpty(ClientId))
818-
{
819-
pool.Release(ClientId, stream);
820-
}
821-
}
822-
801+
// the client can be closed in an unexpected way so we need to remove it from the pool
802+
// so you will find pool.remove(ClientId) also to the disconnect event
803+
pool.MaybeClose(ClientId, reason);
823804
var result = new CloseResponse(0, ResponseCode.Ok);
824805
return result;
825806
}
@@ -831,6 +812,16 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
831812

832813
public string ClientId { get; init; }
833814

815+
public IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers
816+
{
817+
get => publishers;
818+
}
819+
820+
public IDictionary<byte, (string, ConsumerEvents)> Consumers
821+
{
822+
get => consumers;
823+
}
824+
834825
public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
835826
{
836827
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ namespace RabbitMQ.Stream.Client
1010
{
1111
internal static class ClientExceptions
1212
{
13-
1413
// <summary>
1514
/// IsAKnownException returns true if the exception is a known exception
1615
/// We need it to reconnect when the producer/consumer.
@@ -32,11 +31,19 @@ internal static bool IsAKnownException(Exception exception)
3231
{
3332
var x = aggregateException.InnerExceptions.Select(x =>
3433
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
35-
x.GetType() == typeof(LeaderNotFoundException));
34+
x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException));
3635
return x.Any();
3736
}
3837

39-
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
38+
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) ||
39+
IsStreamNotAvailable(exception);
40+
}
41+
42+
internal static bool IsStreamNotAvailable(Exception exception)
43+
{
44+
// StreamNotAvailable is a temporary exception it can happen when the stream is just created and
45+
// it is not ready yet to all the nodes. In this case we can try to reconnect.
46+
return exception is CreateException { ResponseCode: ResponseCode.StreamNotAvailable };
4047
}
4148

4249
public static void MaybeThrowException(ResponseCode responseCode, string message)

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
namespace RabbitMQ.Stream.Client
1616
{
17+
internal static class ConnectionClosedReason
18+
{
19+
public const string Normal = "TCP connection closed normal";
20+
public const string Unexpected = "TCP connection closed unexpected";
21+
}
22+
1723
public class Connection : IDisposable
1824
{
1925
private readonly Socket socket;
@@ -25,6 +31,7 @@ public class Connection : IDisposable
2531
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
2632
private int numFrames;
2733
private bool isClosed = false;
34+
private string _closedReason = ConnectionClosedReason.Unexpected;
2835
private bool _disposedValue;
2936
private readonly ILogger _logger;
3037

@@ -35,6 +42,10 @@ public class Connection : IDisposable
3542
internal int NumFrames => numFrames;
3643
internal string ClientId { get; set; }
3744
public bool IsClosed => isClosed;
45+
public void UpdateCloseStatus(string reason)
46+
{
47+
_closedReason = reason;
48+
}
3849

3950
private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
4051
{
@@ -191,14 +202,12 @@ private async Task ProcessIncomingFrames()
191202
finally
192203
{
193204
isClosed = true;
194-
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
195-
ClientId, Token.IsCancellationRequested);
205+
_logger?.LogDebug(
206+
"TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ",
207+
ClientId, _closedReason, Token.IsCancellationRequested);
196208
// Mark the PipeReader as complete
197209
await reader.CompleteAsync(caught).ConfigureAwait(false);
198-
var t = closedCallback?.Invoke("TCP Connection Closed")!;
199-
if (t != null)
200-
await t.ConfigureAwait(false);
201-
_logger?.LogDebug("TCP Connection Closed");
210+
closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false);
202211
}
203212
}
204213

@@ -231,6 +240,7 @@ public void Dispose()
231240
{
232241
try
233242
{
243+
UpdateCloseStatus(ConnectionClosedReason.Normal);
234244
if (!_cancelTokenSource.IsCancellationRequested)
235245
{
236246
_cancelTokenSource.Cancel();

0 commit comments

Comments
 (0)