Skip to content

Commit c164e6f

Browse files
authored
Implement a super class for producer and consumer (#28)
* Implement a super-class for producer and consumer * to deal with the reconnection. AbstractReconnectLifeCycle tries to reconnect internally. This is an edge case. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 8beee38 commit c164e6f

15 files changed

+204
-71
lines changed

README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
2424
- [x] Simple Publish messages
2525
- [x] Implement backpressure (it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
2626
- [x] Simple Consume messages
27+
- [x] Recovery connection on connection lost
28+
- [x] Recovery management on connection lost
29+
- [x] Recovery queues on connection lost
30+
- [x] Recovery publishers on connection lost
31+
- [x] Recovery consumers on connection lost
2732
- [ ] Complete the consumer part with `pause` and `unpause`
2833
- [ ] Complete the binding/unbinding with the special characters
2934
- [ ] Complete the queues/exchanges name with the special characters
3035
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
31-
- [x] Recovery connection on connection lost
32-
- [x] Recovery management on connection lost
33-
- [x] Recovery queues on connection lost
34-
- [x] Recovery publisher on connection lost
35-
- [x] Recovery consumer on connection lost
36-
- [ ] Docker image to test in LRE
36+
- [ ] Recovery exchanges on connection lost
37+
- [ ] Recovery bindings on connection lost
38+
- [ ] Docker image to test in LRE [not mandatory]
3739
- [ ] Check the TODO in the code
3840

RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,6 @@ public interface IBackOffDelayPolicy
6060
/// or when the user wants to disable the backoff delay policy.
6161
/// </summary>
6262
bool IsActive();
63+
64+
int CurrentAttempt { get; }
6365
}

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using Amqp;
2+
13
namespace RabbitMQ.AMQP.Client.Impl;
24

3-
public class AmqpClosedException(string message) : Exception(message);
5+
public class AmqpNotOpenException(string message) : Exception(message);
46

57
public abstract class AbstractLifeCycle : ILifeCycle
68
{
@@ -16,9 +18,18 @@ protected virtual Task OpenAsync()
1618

1719
protected void ThrowIfClosed()
1820
{
19-
if (State == State.Closed)
21+
switch (State)
2022
{
21-
throw new AmqpClosedException(GetType().Name);
23+
case State.Closed:
24+
throw new AmqpNotOpenException("Resource is closed");
25+
case State.Closing:
26+
throw new AmqpNotOpenException("Resource is closing");
27+
case State.Reconnecting:
28+
throw new AmqpNotOpenException("Resource is Reconnecting");
29+
case State.Open:
30+
break;
31+
default:
32+
throw new ArgumentOutOfRangeException();
2233
}
2334
}
2435

@@ -41,4 +52,40 @@ protected void OnNewStatus(State newState, Error? error)
4152
public event LifeCycleCallBack? ChangeState;
4253
}
4354

55+
public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle
56+
{
57+
private readonly BackOffDelayPolicy _backOffDelayPolicy = BackOffDelayPolicy.Create(2);
58+
59+
internal void ChangeStatus(State newState, Error? error)
60+
{
61+
OnNewStatus(newState, error);
62+
}
4463

64+
internal async Task Reconnect()
65+
{
66+
try
67+
{
68+
int randomWait = Random.Shared.Next(300, 900);
69+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} is reconnecting in {randomWait} ms, " +
70+
$"attempt: {_backOffDelayPolicy.CurrentAttempt}");
71+
await Task.Delay(randomWait).ConfigureAwait(false);
72+
await OpenAsync().ConfigureAwait(false);
73+
Trace.WriteLine(TraceLevel.Information,
74+
$"{ToString()} is reconnected, attempt: {_backOffDelayPolicy.CurrentAttempt}");
75+
_backOffDelayPolicy.Reset();
76+
}
77+
catch (Exception e)
78+
{
79+
// Here we give another chance to reconnect
80+
// that's an edge case, where the link is not ready for some reason
81+
// the backoff policy will be used to delay the reconnection and give just a few attempts
82+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to reconnect, {e.Message}");
83+
int delay = _backOffDelayPolicy.Delay();
84+
await Task.Delay(delay).ConfigureAwait(false);
85+
if (_backOffDelayPolicy.IsActive())
86+
{
87+
await Reconnect().ConfigureAwait(false);
88+
}
89+
}
90+
}
91+
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private ClosedCallback MaybeRecoverConnection()
278278
{
279279
// close all the sessions, if the connection is closed the sessions are not valid anymore
280280
_nativePubSubSessions.ClearSessions();
281-
281+
282282
if (error != null)
283283
{
284284
// we assume here that the connection is closed unexpectedly, since the error is not null
@@ -299,8 +299,8 @@ private ClosedCallback MaybeRecoverConnection()
299299
// to reconnecting and all the events are fired
300300
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
301301
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
302-
303-
302+
303+
304304
await Task.Run(async () =>
305305
{
306306
bool connected = false;
@@ -320,8 +320,12 @@ await Task.Run(async () =>
320320
try
321321
{
322322
int next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
323+
323324
Trace.WriteLine(TraceLevel.Information,
324-
$"Trying Recovering connection in {next} milliseconds. Info: {ToString()})");
325+
$"Trying Recovering connection in {next} milliseconds, " +
326+
$"attempt: {_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().CurrentAttempt}. " +
327+
$"Info: {ToString()})");
328+
325329
await Task.Delay(TimeSpan.FromMilliseconds(next))
326330
.ConfigureAwait(false);
327331

@@ -365,8 +369,16 @@ await _recordingTopologyListener.Accept(visitor)
365369
OnNewStatus(State.Open, null);
366370
// after the connection is recovered we have to reconnect all the publishers and consumers
367371

368-
await ReconnectEntities().ConfigureAwait(false);
372+
try
373+
{
374+
await ReconnectEntities().ConfigureAwait(false);
375+
}
376+
catch (Exception e)
377+
{
378+
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
379+
}
369380
}).ConfigureAwait(false);
381+
370382
return;
371383
}
372384

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
namespace RabbitMQ.AMQP.Client.Impl;
55

6-
public class AmqpConsumer : AbstractLifeCycle, IConsumer
6+
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
77
{
88
private readonly AmqpConnection _connection;
99
private readonly string _address;
1010
private readonly MessageHandler _messageHandler;
11-
private readonly int _initialCredits = 0;
11+
private readonly int _initialCredits;
1212
private readonly Map _filters;
1313
private ReceiverLink? _receiverLink;
1414

@@ -37,7 +37,8 @@ protected sealed override Task OpenAsync()
3737
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3838
if (_receiverLink.LinkState != LinkState.Attached)
3939
{
40-
throw new ConsumerException("Failed to create receiver link. Link state is not attached, error: " +
40+
throw new ConsumerException(
41+
$"{ToString()} Failed to create receiver link. Link state is not attached, error: " +
4142
_receiverLink.Error?.ToString() ?? "Unknown error");
4243
}
4344

@@ -46,7 +47,7 @@ protected sealed override Task OpenAsync()
4647
}
4748
catch (Exception e)
4849
{
49-
throw new ConsumerException($"Failed to create receiver link, {e}");
50+
throw new ConsumerException($"{ToString()} Failed to create receiver link, {e}");
5051
}
5152

5253
return Task.CompletedTask;
@@ -97,23 +98,8 @@ public override async Task CloseAsync()
9798
_connection.Consumers.TryRemove(Id, out _);
9899
}
99100

100-
101-
internal void ChangeStatus(State newState, Error? error)
102-
{
103-
OnNewStatus(newState, error);
104-
}
105-
106-
internal async Task Reconnect()
101+
public override string ToString()
107102
{
108-
int randomWait = Random.Shared.Next(200, 800);
109-
Trace.WriteLine(TraceLevel.Information, $"Consumer: {ToString()} is reconnecting in {randomWait} ms");
110-
await Task.Delay(randomWait).ConfigureAwait(false);
111-
112-
if (_receiverLink != null)
113-
{
114-
await _receiverLink.DetachAsync().ConfigureAwait(false)!;
115-
}
116-
117-
await OpenAsync().ConfigureAwait(false);
103+
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
118104
}
119105
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace RabbitMQ.AMQP.Client.Impl;
55
public class AmqpConsumerBuilder(AmqpConnection connection) : IConsumerBuilder
66
{
77
private string _queue = "";
8-
private int _initialCredits = 1;
9-
private Map _filters = new Map();
8+
private int _initialCredits = 10;
9+
private readonly Map _filters = new Map();
1010

1111

1212
public IConsumerBuilder Queue(string queue)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.AMQP.Client.Impl;
88

9-
public class AmqpPublisher : AbstractLifeCycle, IPublisher
9+
public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1010
{
1111
private SenderLink? _senderLink = null;
1212

@@ -38,15 +38,15 @@ protected sealed override Task OpenAsync()
3838
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3939
if (_senderLink.LinkState != LinkState.Attached)
4040
{
41-
throw new PublisherException("Failed to create sender link. Link state is not attached, error: " +
41+
throw new PublisherException($"{ToString()} Failed to create sender link. Link state is not attached, error: " +
4242
_senderLink.Error?.ToString() ?? "Unknown error");
4343
}
4444

4545
return base.OpenAsync();
4646
}
4747
catch (Exception e)
4848
{
49-
throw new PublisherException($"Failed to create sender link, {e}");
49+
throw new PublisherException($"{ToString()} Failed to create sender link, {e}");
5050
}
5151
}
5252

@@ -127,7 +127,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
127127
}
128128
else
129129
{
130-
Trace.WriteLine(TraceLevel.Error, "Message not sent. Killing the process.");
130+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Message not sent. Killing the process.");
131131
Process.GetCurrentProcess().Kill();
132132
}
133133

@@ -138,7 +138,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
138138
}
139139
catch (Exception e)
140140
{
141-
throw new PublisherException($"Failed to publish message, {e}");
141+
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
142142
}
143143

144144
return Task.CompletedTask;
@@ -151,6 +151,7 @@ public override async Task CloseAsync()
151151
{
152152
return;
153153
}
154+
154155
OnNewStatus(State.Closing, null);
155156
try
156157
{
@@ -173,22 +174,8 @@ await _senderLink.CloseAsync()
173174
}
174175

175176

176-
internal void ChangeStatus(State newState, Error? error)
177-
{
178-
OnNewStatus(newState, error);
179-
}
180-
181-
internal async Task Reconnect()
177+
public override string ToString()
182178
{
183-
int randomWait = Random.Shared.Next(200, 800);
184-
Trace.WriteLine(TraceLevel.Information, $"Publisher: {ToString()} is reconnecting in {randomWait} ms");
185-
await Task.Delay(randomWait).ConfigureAwait(false);
186-
187-
if (_senderLink != null)
188-
{
189-
await _senderLink.DetachAsync().ConfigureAwait(false)!;
190-
}
191-
192-
await OpenAsync().ConfigureAwait(false);
179+
return $"Publisher{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
193180
}
194181
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,7 @@ public ConnectionSettings Build()
8181
{
8282
var c = new ConnectionSettings(_host, _port, _user,
8383
_password, _virtualHost,
84-
_scheme, _connection)
85-
{
86-
RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration
87-
};
84+
_scheme, _connection) { RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration };
8885

8986
return c;
9087
}
@@ -282,16 +279,27 @@ public static BackOffDelayPolicy Create()
282279
{
283280
return new BackOffDelayPolicy();
284281
}
282+
283+
public static BackOffDelayPolicy Create(int maxAttempt)
284+
{
285+
return new BackOffDelayPolicy(maxAttempt);
286+
}
285287

286288
private BackOffDelayPolicy()
287289
{
288290
}
289291

292+
private BackOffDelayPolicy(int maxAttempt)
293+
{
294+
_maxAttempt = maxAttempt;
295+
}
296+
290297
private const int StartRandomMilliseconds = 500;
291298
private const int EndRandomMilliseconds = 1500;
292299

293300
private int _attempt = 1;
294-
private int _totalAttempt = 0;
301+
private readonly int _maxAttempt = 12;
302+
295303

296304
private void ResetAfterMaxAttempt()
297305
{
@@ -304,26 +312,28 @@ private void ResetAfterMaxAttempt()
304312
public int Delay()
305313
{
306314
_attempt++;
307-
_totalAttempt++;
315+
CurrentAttempt++;
308316
ResetAfterMaxAttempt();
309317
return Random.Shared.Next(StartRandomMilliseconds, EndRandomMilliseconds) * _attempt;
310318
}
311319

312320
public void Reset()
313321
{
314322
_attempt = 1;
315-
_totalAttempt = 0;
323+
CurrentAttempt = 0;
316324
}
317325

318326
public bool IsActive()
319327
{
320-
return _totalAttempt < 12;
328+
return CurrentAttempt < _maxAttempt;
321329
}
322330

331+
public int CurrentAttempt { get; private set; } = 0;
332+
323333

324334
public override string ToString()
325335
{
326-
return $"BackOffDelayPolicy{{ Attempt={_attempt}, TotalAttempt={_totalAttempt}, IsActive={IsActive} }}";
336+
return $"BackOffDelayPolicy{{ Attempt={_attempt}, TotalAttempt={CurrentAttempt}, IsActive={IsActive} }}";
327337
}
328338
}
329339

Tests/ConnectionRecoveryTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public void Reset()
2222
}
2323

2424
public bool IsActive() => false;
25+
public int CurrentAttempt => 1;
2526
}
2627

2728
internal class FakeFastBackOffDelay : IBackOffDelayPolicy
@@ -36,6 +37,7 @@ public void Reset()
3637
}
3738

3839
public bool IsActive() => true;
40+
public int CurrentAttempt => 1;
3941
}
4042

4143
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)

0 commit comments

Comments
 (0)