Skip to content

Commit 581f30e

Browse files
authored
Merge branch 'main' into rabbitmq-amqp-dotnet-client-17-2
2 parents ba29f70 + c164e6f commit 581f30e

15 files changed

+200
-64
lines changed

README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
2424
- [x] Simple Publish messages
2525
- [x] Implement backpressure (it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
2626
- [x] Simple Consume messages
27+
- [x] Recovery connection on connection lost
28+
- [x] Recovery management on connection lost
29+
- [x] Recovery queues on connection lost
30+
- [x] Recovery publishers on connection lost
31+
- [x] Recovery consumers on connection lost
2732
- [ ] Complete the consumer part with `pause` and `unpause`
2833
- [ ] Complete the binding/unbinding with the special characters
2934
- [ ] Complete the queues/exchanges name with the special characters
3035
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
31-
- [x] Recovery connection on connection lost
32-
- [x] Recovery management on connection lost
33-
- [x] Recovery queues on connection lost
34-
- [x] Recovery publisher on connection lost
35-
- [x] Recovery consumer on connection lost
36-
- [ ] Docker image to test in LRE
36+
- [ ] Recovery exchanges on connection lost
37+
- [ ] Recovery bindings on connection lost
38+
- [ ] Docker image to test in LRE [not mandatory]
3739
- [ ] Check the TODO in the code
3840

RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,6 @@ public interface IBackOffDelayPolicy
6060
/// or when the user wants to disable the backoff delay policy.
6161
/// </summary>
6262
bool IsActive();
63+
64+
int CurrentAttempt { get; }
6365
}

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using Amqp;
2+
13
namespace RabbitMQ.AMQP.Client.Impl;
24

3-
public class AmqpClosedException(string message) : Exception(message);
5+
public class AmqpNotOpenException(string message) : Exception(message);
46

57
public abstract class AbstractLifeCycle : ILifeCycle
68
{
@@ -16,9 +18,18 @@ protected virtual Task OpenAsync()
1618

1719
protected void ThrowIfClosed()
1820
{
19-
if (State == State.Closed)
21+
switch (State)
2022
{
21-
throw new AmqpClosedException(GetType().Name);
23+
case State.Closed:
24+
throw new AmqpNotOpenException("Resource is closed");
25+
case State.Closing:
26+
throw new AmqpNotOpenException("Resource is closing");
27+
case State.Reconnecting:
28+
throw new AmqpNotOpenException("Resource is Reconnecting");
29+
case State.Open:
30+
break;
31+
default:
32+
throw new ArgumentOutOfRangeException();
2233
}
2334
}
2435

@@ -41,4 +52,40 @@ protected void OnNewStatus(State newState, Error? error)
4152
public event LifeCycleCallBack? ChangeState;
4253
}
4354

55+
public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle
56+
{
57+
private readonly BackOffDelayPolicy _backOffDelayPolicy = BackOffDelayPolicy.Create(2);
58+
59+
internal void ChangeStatus(State newState, Error? error)
60+
{
61+
OnNewStatus(newState, error);
62+
}
4463

64+
internal async Task Reconnect()
65+
{
66+
try
67+
{
68+
int randomWait = Random.Shared.Next(300, 900);
69+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} is reconnecting in {randomWait} ms, " +
70+
$"attempt: {_backOffDelayPolicy.CurrentAttempt}");
71+
await Task.Delay(randomWait).ConfigureAwait(false);
72+
await OpenAsync().ConfigureAwait(false);
73+
Trace.WriteLine(TraceLevel.Information,
74+
$"{ToString()} is reconnected, attempt: {_backOffDelayPolicy.CurrentAttempt}");
75+
_backOffDelayPolicy.Reset();
76+
}
77+
catch (Exception e)
78+
{
79+
// Here we give another chance to reconnect
80+
// that's an edge case, where the link is not ready for some reason
81+
// the backoff policy will be used to delay the reconnection and give just a few attempts
82+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to reconnect, {e.Message}");
83+
int delay = _backOffDelayPolicy.Delay();
84+
await Task.Delay(delay).ConfigureAwait(false);
85+
if (_backOffDelayPolicy.IsActive())
86+
{
87+
await Reconnect().ConfigureAwait(false);
88+
}
89+
}
90+
}
91+
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,12 @@ await Task.Run(async () =>
326326
try
327327
{
328328
int next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
329+
329330
Trace.WriteLine(TraceLevel.Information,
330-
$"Trying Recovering connection in {next} milliseconds. Info: {ToString()})");
331+
$"Trying Recovering connection in {next} milliseconds, " +
332+
$"attempt: {_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().CurrentAttempt}. " +
333+
$"Info: {ToString()})");
334+
331335
await Task.Delay(TimeSpan.FromMilliseconds(next))
332336
.ConfigureAwait(false);
333337

@@ -371,8 +375,16 @@ await _recordingTopologyListener.Accept(visitor)
371375
OnNewStatus(State.Open, null);
372376
// after the connection is recovered we have to reconnect all the publishers and consumers
373377

374-
await ReconnectEntities().ConfigureAwait(false);
378+
try
379+
{
380+
await ReconnectEntities().ConfigureAwait(false);
381+
}
382+
catch (Exception e)
383+
{
384+
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
385+
}
375386
}).ConfigureAwait(false);
387+
376388
return;
377389
}
378390

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
namespace RabbitMQ.AMQP.Client.Impl;
55

6-
public class AmqpConsumer : AbstractLifeCycle, IConsumer
6+
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
77
{
88
private readonly AmqpConnection _connection;
99
private readonly string _address;
1010
private readonly MessageHandler _messageHandler;
11-
private readonly int _initialCredits = 0;
11+
private readonly int _initialCredits;
1212
private readonly Map _filters;
1313
private ReceiverLink? _receiverLink;
1414

@@ -37,7 +37,8 @@ protected sealed override Task OpenAsync()
3737
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3838
if (_receiverLink.LinkState != LinkState.Attached)
3939
{
40-
throw new ConsumerException("Failed to create receiver link. Link state is not attached, error: " +
40+
throw new ConsumerException(
41+
$"{ToString()} Failed to create receiver link. Link state is not attached, error: " +
4142
_receiverLink.Error?.ToString() ?? "Unknown error");
4243
}
4344

@@ -46,7 +47,7 @@ protected sealed override Task OpenAsync()
4647
}
4748
catch (Exception e)
4849
{
49-
throw new ConsumerException($"Failed to create receiver link, {e}");
50+
throw new ConsumerException($"{ToString()} Failed to create receiver link, {e}");
5051
}
5152

5253
return Task.CompletedTask;
@@ -97,23 +98,8 @@ public override async Task CloseAsync()
9798
_connection.Consumers.TryRemove(Id, out _);
9899
}
99100

100-
101-
internal void ChangeStatus(State newState, Error? error)
102-
{
103-
OnNewStatus(newState, error);
104-
}
105-
106-
internal async Task Reconnect()
101+
public override string ToString()
107102
{
108-
int randomWait = Random.Shared.Next(200, 800);
109-
Trace.WriteLine(TraceLevel.Information, $"Consumer: {ToString()} is reconnecting in {randomWait} ms");
110-
await Task.Delay(randomWait).ConfigureAwait(false);
111-
112-
if (_receiverLink != null)
113-
{
114-
await _receiverLink.DetachAsync().ConfigureAwait(false)!;
115-
}
116-
117-
await OpenAsync().ConfigureAwait(false);
103+
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
118104
}
119105
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace RabbitMQ.AMQP.Client.Impl;
55
public class AmqpConsumerBuilder(AmqpConnection connection) : IConsumerBuilder
66
{
77
private string _queue = "";
8-
private int _initialCredits = 1;
9-
private Map _filters = new Map();
8+
private int _initialCredits = 10;
9+
private readonly Map _filters = new Map();
1010

1111

1212
public IConsumerBuilder Queue(string queue)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.AMQP.Client.Impl;
88

9-
public class AmqpPublisher : AbstractLifeCycle, IPublisher
9+
public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1010
{
1111
private SenderLink? _senderLink = null;
1212

@@ -38,15 +38,15 @@ protected sealed override Task OpenAsync()
3838
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3939
if (_senderLink.LinkState != LinkState.Attached)
4040
{
41-
throw new PublisherException("Failed to create sender link. Link state is not attached, error: " +
41+
throw new PublisherException($"{ToString()} Failed to create sender link. Link state is not attached, error: " +
4242
_senderLink.Error?.ToString() ?? "Unknown error");
4343
}
4444

4545
return base.OpenAsync();
4646
}
4747
catch (Exception e)
4848
{
49-
throw new PublisherException($"Failed to create sender link, {e}");
49+
throw new PublisherException($"{ToString()} Failed to create sender link, {e}");
5050
}
5151
}
5252

@@ -127,7 +127,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
127127
}
128128
else
129129
{
130-
Trace.WriteLine(TraceLevel.Error, "Message not sent. Killing the process.");
130+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Message not sent. Killing the process.");
131131
Process.GetCurrentProcess().Kill();
132132
}
133133

@@ -138,7 +138,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
138138
}
139139
catch (Exception e)
140140
{
141-
throw new PublisherException($"Failed to publish message, {e}");
141+
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
142142
}
143143

144144
return Task.CompletedTask;
@@ -151,6 +151,7 @@ public override async Task CloseAsync()
151151
{
152152
return;
153153
}
154+
154155
OnNewStatus(State.Closing, null);
155156
try
156157
{
@@ -173,22 +174,8 @@ await _senderLink.CloseAsync()
173174
}
174175

175176

176-
internal void ChangeStatus(State newState, Error? error)
177-
{
178-
OnNewStatus(newState, error);
179-
}
180-
181-
internal async Task Reconnect()
177+
public override string ToString()
182178
{
183-
int randomWait = Random.Shared.Next(200, 800);
184-
Trace.WriteLine(TraceLevel.Information, $"Publisher: {ToString()} is reconnecting in {randomWait} ms");
185-
await Task.Delay(randomWait).ConfigureAwait(false);
186-
187-
if (_senderLink != null)
188-
{
189-
await _senderLink.DetachAsync().ConfigureAwait(false)!;
190-
}
191-
192-
await OpenAsync().ConfigureAwait(false);
179+
return $"Publisher{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
193180
}
194181
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,16 +295,27 @@ public static BackOffDelayPolicy Create()
295295
{
296296
return new BackOffDelayPolicy();
297297
}
298+
299+
public static BackOffDelayPolicy Create(int maxAttempt)
300+
{
301+
return new BackOffDelayPolicy(maxAttempt);
302+
}
298303

299304
private BackOffDelayPolicy()
300305
{
301306
}
302307

308+
private BackOffDelayPolicy(int maxAttempt)
309+
{
310+
_maxAttempt = maxAttempt;
311+
}
312+
303313
private const int StartRandomMilliseconds = 500;
304314
private const int EndRandomMilliseconds = 1500;
305315

306316
private int _attempt = 1;
307-
private int _totalAttempt = 0;
317+
private readonly int _maxAttempt = 12;
318+
308319

309320
private void ResetAfterMaxAttempt()
310321
{
@@ -317,26 +328,28 @@ private void ResetAfterMaxAttempt()
317328
public int Delay()
318329
{
319330
_attempt++;
320-
_totalAttempt++;
331+
CurrentAttempt++;
321332
ResetAfterMaxAttempt();
322333
return Random.Shared.Next(StartRandomMilliseconds, EndRandomMilliseconds) * _attempt;
323334
}
324335

325336
public void Reset()
326337
{
327338
_attempt = 1;
328-
_totalAttempt = 0;
339+
CurrentAttempt = 0;
329340
}
330341

331342
public bool IsActive()
332343
{
333-
return _totalAttempt < 12;
344+
return CurrentAttempt < _maxAttempt;
334345
}
335346

347+
public int CurrentAttempt { get; private set; } = 0;
348+
336349

337350
public override string ToString()
338351
{
339-
return $"BackOffDelayPolicy{{ Attempt={_attempt}, TotalAttempt={_totalAttempt}, IsActive={IsActive} }}";
352+
return $"BackOffDelayPolicy{{ Attempt={_attempt}, TotalAttempt={CurrentAttempt}, IsActive={IsActive} }}";
340353
}
341354
}
342355

Tests/ConnectionRecoveryTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public void Reset()
2222
}
2323

2424
public bool IsActive() => false;
25+
public int CurrentAttempt => 1;
2526
}
2627

2728
internal class FakeFastBackOffDelay : IBackOffDelayPolicy
@@ -36,6 +37,7 @@ public void Reset()
3637
}
3738

3839
public bool IsActive() => true;
40+
public int CurrentAttempt => 1;
3941
}
4042

4143
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)

Tests/ConnectionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public async Task ThrowAmqpClosedExceptionWhenItemIsClosed()
181181
await management.Queue().Name("ThrowAmqpClosedExceptionWhenItemIsClosed").Declare();
182182
IPublisher publisher = connection.PublisherBuilder().Queue("ThrowAmqpClosedExceptionWhenItemIsClosed").Build();
183183
await publisher.CloseAsync();
184-
await Assert.ThrowsAsync<AmqpClosedException>(async () =>
184+
await Assert.ThrowsAsync<AmqpNotOpenException>(async () =>
185185
await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) =>
186186
{
187187
// it doest matter
@@ -190,10 +190,10 @@ await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) =>
190190
await connection.CloseAsync();
191191
Assert.Empty(connection.GetPublishers());
192192

193-
Assert.Throws<AmqpClosedException>(() =>
193+
Assert.Throws<AmqpNotOpenException>(() =>
194194
connection.PublisherBuilder().Queue("ThrowAmqpClosedExceptionWhenItemIsClosed").Build());
195195

196-
await Assert.ThrowsAsync<AmqpClosedException>(async () =>
196+
await Assert.ThrowsAsync<AmqpNotOpenException>(async () =>
197197
await management.Queue().Name("ThrowAmqpClosedExceptionWhenItemIsClosed").Declare());
198198
}
199199
}

Tests/ManagementTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public async Task RaiseManagementClosedException()
112112
{
113113
var management = new TestAmqpManagement();
114114
await Assert.ThrowsAsync<AmqpClosedException>(async () =>
115-
await management.Request(new Message(), [200]));
115+
await management.Request(new Message(), [200]));
116116
Assert.Equal(State.Closed, management.State);
117117
}
118118

0 commit comments

Comments
 (0)