Skip to content

Commit 445bca1

Browse files
committed
Implement a super class for producer and consumer
to deal with the reconnection. AbstractReconnectLifeCycle tries to reconnect internally. This is an edge case. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 17910e9 commit 445bca1

File tree

10 files changed

+164
-53
lines changed

10 files changed

+164
-53
lines changed

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Amqp;
2+
13
namespace RabbitMQ.AMQP.Client.Impl;
24

35
public class AmqpClosedException(string message) : Exception(message);
@@ -41,4 +43,38 @@ protected void OnNewStatus(State newState, Error? error)
4143
public event LifeCycleCallBack? ChangeState;
4244
}
4345

46+
public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle
47+
{
48+
private readonly BackOffDelayPolicy _backOffDelayPolicy = BackOffDelayPolicy.Create(2);
49+
50+
internal void ChangeStatus(State newState, Error? error)
51+
{
52+
OnNewStatus(newState, error);
53+
}
4454

55+
internal async Task Reconnect()
56+
{
57+
try
58+
{
59+
int randomWait = Random.Shared.Next(300, 900);
60+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} is reconnecting in {randomWait} ms");
61+
await Task.Delay(randomWait).ConfigureAwait(false);
62+
await OpenAsync().ConfigureAwait(false);
63+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} is reconnected");
64+
_backOffDelayPolicy.Reset();
65+
}
66+
catch (Exception e)
67+
{
68+
// Here we give another chance to reconnect
69+
// that's an edge case, where the link is not ready for some reason
70+
// the backoff policy will be used to delay the reconnection and give just a few attempts
71+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to reconnect, {e.Message}");
72+
int delay = _backOffDelayPolicy.Delay();
73+
await Task.Delay(delay).ConfigureAwait(false);
74+
if (_backOffDelayPolicy.IsActive())
75+
{
76+
await Reconnect().ConfigureAwait(false);
77+
}
78+
}
79+
}
80+
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private ClosedCallback MaybeRecoverConnection()
247247
{
248248
// close all the sessions, if the connection is closed the sessions are not valid anymore
249249
_nativePubSubSessions.ClearSessions();
250-
250+
251251
if (error != null)
252252
{
253253
// we assume here that the connection is closed unexpectedly, since the error is not null
@@ -268,8 +268,8 @@ private ClosedCallback MaybeRecoverConnection()
268268
// to reconnecting and all the events are fired
269269
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
270270
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
271-
272-
271+
272+
273273
await Task.Run(async () =>
274274
{
275275
bool connected = false;
@@ -333,8 +333,16 @@ await _recordingTopologyListener.Accept(visitor)
333333
OnNewStatus(State.Open, null);
334334
// after the connection is recovered we have to reconnect all the publishers and consumers
335335

336-
await ReconnectEntities().ConfigureAwait(false);
336+
try
337+
{
338+
await ReconnectEntities().ConfigureAwait(false);
339+
}
340+
catch (Exception e)
341+
{
342+
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
343+
}
337344
}).ConfigureAwait(false);
345+
338346
return;
339347
}
340348

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
namespace RabbitMQ.AMQP.Client.Impl;
55

6-
public class AmqpConsumer : AbstractLifeCycle, IConsumer
6+
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
77
{
88
private readonly AmqpConnection _connection;
99
private readonly string _address;
1010
private readonly MessageHandler _messageHandler;
11-
private readonly int _initialCredits = 0;
11+
private readonly int _initialCredits;
1212
private readonly Map _filters;
1313
private ReceiverLink? _receiverLink;
1414

@@ -37,7 +37,8 @@ protected sealed override Task OpenAsync()
3737
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3838
if (_receiverLink.LinkState != LinkState.Attached)
3939
{
40-
throw new ConsumerException("Failed to create receiver link. Link state is not attached, error: " +
40+
throw new ConsumerException(
41+
$"{ToString()} Failed to create receiver link. Link state is not attached, error: " +
4142
_receiverLink.Error?.ToString() ?? "Unknown error");
4243
}
4344

@@ -46,7 +47,7 @@ protected sealed override Task OpenAsync()
4647
}
4748
catch (Exception e)
4849
{
49-
throw new ConsumerException($"Failed to create receiver link, {e}");
50+
throw new ConsumerException($"{ToString()} Failed to create receiver link, {e}");
5051
}
5152

5253
return Task.CompletedTask;
@@ -97,23 +98,8 @@ public override async Task CloseAsync()
9798
_connection.Consumers.TryRemove(Id, out _);
9899
}
99100

100-
101-
internal void ChangeStatus(State newState, Error? error)
102-
{
103-
OnNewStatus(newState, error);
104-
}
105-
106-
internal async Task Reconnect()
101+
public override string ToString()
107102
{
108-
int randomWait = Random.Shared.Next(200, 800);
109-
Trace.WriteLine(TraceLevel.Information, $"Consumer: {ToString()} is reconnecting in {randomWait} ms");
110-
await Task.Delay(randomWait).ConfigureAwait(false);
111-
112-
if (_receiverLink != null)
113-
{
114-
await _receiverLink.DetachAsync().ConfigureAwait(false)!;
115-
}
116-
117-
await OpenAsync().ConfigureAwait(false);
103+
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
118104
}
119105
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace RabbitMQ.AMQP.Client.Impl;
55
public class AmqpConsumerBuilder(AmqpConnection connection) : IConsumerBuilder
66
{
77
private string _queue = "";
8-
private int _initialCredits = 1;
9-
private Map _filters = new Map();
8+
private int _initialCredits = 10;
9+
private readonly Map _filters = new Map();
1010

1111

1212
public IConsumerBuilder Queue(string queue)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.AMQP.Client.Impl;
88

9-
public class AmqpPublisher : AbstractLifeCycle, IPublisher
9+
public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1010
{
1111
private SenderLink? _senderLink = null;
1212

@@ -38,15 +38,15 @@ protected sealed override Task OpenAsync()
3838
attachCompleted.WaitOne(TimeSpan.FromSeconds(5));
3939
if (_senderLink.LinkState != LinkState.Attached)
4040
{
41-
throw new PublisherException("Failed to create sender link. Link state is not attached, error: " +
41+
throw new PublisherException($"{ToString()} Failed to create sender link. Link state is not attached, error: " +
4242
_senderLink.Error?.ToString() ?? "Unknown error");
4343
}
4444

4545
return base.OpenAsync();
4646
}
4747
catch (Exception e)
4848
{
49-
throw new PublisherException($"Failed to create sender link, {e}");
49+
throw new PublisherException($"{ToString()} Failed to create sender link, {e}");
5050
}
5151
}
5252

@@ -127,7 +127,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
127127
}
128128
else
129129
{
130-
Trace.WriteLine(TraceLevel.Error, "Message not sent. Killing the process.");
130+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Message not sent. Killing the process.");
131131
Process.GetCurrentProcess().Kill();
132132
}
133133

@@ -138,7 +138,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
138138
}
139139
catch (Exception e)
140140
{
141-
throw new PublisherException($"Failed to publish message, {e}");
141+
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
142142
}
143143

144144
return Task.CompletedTask;
@@ -151,6 +151,7 @@ public override async Task CloseAsync()
151151
{
152152
return;
153153
}
154+
154155
OnNewStatus(State.Closing, null);
155156
try
156157
{
@@ -173,22 +174,8 @@ await _senderLink.CloseAsync()
173174
}
174175

175176

176-
internal void ChangeStatus(State newState, Error? error)
177-
{
178-
OnNewStatus(newState, error);
179-
}
180-
181-
internal async Task Reconnect()
177+
public override string ToString()
182178
{
183-
int randomWait = Random.Shared.Next(200, 800);
184-
Trace.WriteLine(TraceLevel.Information, $"Publisher: {ToString()} is reconnecting in {randomWait} ms");
185-
await Task.Delay(randomWait).ConfigureAwait(false);
186-
187-
if (_senderLink != null)
188-
{
189-
await _senderLink.DetachAsync().ConfigureAwait(false)!;
190-
}
191-
192-
await OpenAsync().ConfigureAwait(false);
179+
return $"Publisher{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
193180
}
194181
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,7 @@ public ConnectionSettings Build()
7878
{
7979
var c = new ConnectionSettings(_host, _port, _user,
8080
_password, _virtualHost,
81-
_scheme, _connection)
82-
{
83-
RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration
84-
};
81+
_scheme, _connection) { RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration };
8582

8683
return c;
8784
}
@@ -265,16 +262,28 @@ public static BackOffDelayPolicy Create()
265262
{
266263
return new BackOffDelayPolicy();
267264
}
265+
266+
public static BackOffDelayPolicy Create(int maxAttempt)
267+
{
268+
return new BackOffDelayPolicy(maxAttempt);
269+
}
268270

269271
private BackOffDelayPolicy()
270272
{
271273
}
272274

275+
private BackOffDelayPolicy(int maxAttempt)
276+
{
277+
_maxAttempt = maxAttempt;
278+
}
279+
273280
private const int StartRandomMilliseconds = 500;
274281
private const int EndRandomMilliseconds = 1500;
275282

276283
private int _attempt = 1;
277284
private int _totalAttempt = 0;
285+
private readonly int _maxAttempt = 12;
286+
278287

279288
private void ResetAfterMaxAttempt()
280289
{
@@ -300,7 +309,7 @@ public void Reset()
300309

301310
public bool IsActive()
302311
{
303-
return _totalAttempt < 12;
312+
return _totalAttempt < _maxAttempt;
304313
}
305314

306315

kubernetes/amqp_cluster.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
apiVersion: v1
2+
kind: Namespace
3+
metadata:
4+
name: amqp-clients-test
5+
---
6+
apiVersion: rabbitmq.com/v1beta1
7+
kind: RabbitmqCluster
8+
metadata:
9+
name: tls
10+
namespace: amqp-clients-test
11+
spec:
12+
replicas: 3
13+
image: pivotalrabbitmq/rabbitmq:main
14+
service:
15+
type: LoadBalancer
16+
# tls:
17+
# secretName: tls-secret
18+
resources:
19+
requests:
20+
cpu: 1
21+
memory: 1Gi
22+
limits:
23+
cpu: 1
24+
memory: 1Gi
25+
rabbitmq:
26+
additionalPlugins:
27+
- rabbitmq_stream
28+
- rabbitmq_stream_management
29+
additionalConfig: |
30+
log.console = true
31+
log.console.level = debug

kubernetes/deploy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
kubectl apply -f amqp_cluster.yaml && kubectl create secret tls tls-secret --cert=test-server.pem --key=test-server-key.pem -n amqp-clients-test
2+

kubernetes/test-server-key.pem

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-----BEGIN PRIVATE KEY-----
2+
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCzfGD7b37pIVtW
3+
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
4+
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
5+
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
6+
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
7+
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
8+
rCXOOKN1AgMBAAECggEAEXHelDmZPmv3TmmLPb6lmV92RujVtSpbiRX2J7tKCz+o
9+
aeCLb4DE0ulM1iicha+NYBiGj2nN+rkLaVvEty2yNYd0QgRHH5I0GcySFqOvoLSZ
10+
Y0O7jaukDJ6w0KNLNKt41Xc31f146MJMeT62UrGQayBjXidC7QTLNoQq9zyQ6MQK
11+
crbj8f/TqT68V5f0nQeYFRGPYZPiaLcDB6mOCL09B4bfMxcOl0/6SVouV8BrTHxs
12+
kqtO6yxrpc0XL1vfCrBqljlVLnXyGNmkaegSQMOTDfGqM6Mkc6771DYt5MJsmVCH
13+
VgDZeMs/BJA/srV2rXW4cwfO0OOSTkE9cCNzZM6Q6QKBgQDmLk+bLb6cafFceuGl
14+
gktNYi1TbTD62/shKulmFm8C7jNMTdbGi4pEwlnxzU57A+spr0nzody5c8qMIfUF
15+
Cpf9aRWs7xG0WOCuYbUSI+gbICxaJOKe00TyWwf/xn13p6J4wecg4n5C8Wo1QIRi
16+
nFwgaqYATkyzhE9+YOQKpxgeTQKBgQDHnlmMxiTuD3j9BxEkD3aY4E97qTfPNqgM
17+
umrwXoU0paYtgfuxMe7yjkE/qVKn3QP+wzN+XMR6YcBdphQPlSCcEyJrAZZCGrgJ
18+
CSO9anY6CA5bgZ9Mk3pBCldHEQqxWrg27bejkn4KV3PXmDtQwMYrpsgEu3DPCgy/
19+
TUVTnpK9yQKBgQC6PGQaWOOtKCapvZ6OTCJjJPkpU+JaRdwlVNPszl/ZTiLhLOWG
20+
VOZ1hY5Cjutdqqj9XB8IaUDuJ5qM0PiusIiS9xAbkH6RnYuEa/eWCslEET7xXICj
21+
IqrZMAAD2XQweMiCzdgUikzAGxXkqiOyqXH8pG1VOATlBjtPNFOtrs5bzQKBgD08
22+
1cn6609gzcQJy/ddCwwBHEEae3WFFe65rZ7J0GGDQ8SIMLd+Uwh0HY4zGplGkzgv
23+
l/d27AuDO2k/Tr4tCJD4ycE7/mWPHtAezqkIJPbOi+EEleL/By02x+mUT8xywTqQ
24+
mJqEkUgI5g/IssGmMeUoSAozmnrZYWm6gb8SUYAJAoGATjkZuRTAx+85ninZCbMU
25+
fKpxM5KhCOHOBzxEKjmHp/fRUga+XoZMeWtmcqABWa3pD3kgTa+9z1hrQaUCh1cw
26+
01QjqGxY9Z7VdBMPexXiht1JDZULaJ3Cif4YKUrwrDWc737HRqLP6c7EMwXnAod2
27+
RPe55pKjmk0X/uuple2WCAE=
28+
-----END PRIVATE KEY-----

kubernetes/test-server.pem

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIID7zCCAtegAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH
3+
ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTA5LTEzVDE2OjU0OjQ0LjU2NzY0MjEN
4+
MAsGA1UEBwwEJCQkJDAeFw0yMzA5MTMxNDU0NDRaFw0zMzA5MTAxNDU0NDRaMDYx
5+
IzAhBgNVBAMMGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tMQ8wDQYDVQQKDAZz
6+
ZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCzfGD7b37pIVtW
7+
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
8+
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
9+
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
10+
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
11+
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
12+
rCXOOKN1AgMBAAGjgfEwge4wCQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0l
13+
BAwwCgYIKwYBBQUHAwEwTAYDVR0RBEUwQ4IaZ3NhbnRvbWFnZzZMVkRNLnZtd2Fy
14+
ZS5jb22CGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tgglsb2NhbGhvc3QwHQYD
15+
VR0OBBYEFDHW2nif3ILxi7DWd4TNyn63Q7apMB8GA1UdIwQYMBaAFKccKJsr3YYn
16+
MVNXfujEGRCONpu9MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVy
17+
OjgwMDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQCwa+ksiRPR06JZzKFd
18+
pcD4K5oZ6F5mVpTqn3Kf5jS1cz6Ippi/T8nU8k/xVKmDMqqCWCYGal1U8DmHGPzQ
19+
WOWMk/Ibb72feCS4txIH4GuV/ZO868/5qOy1rmP/UjOY6Kpyju/Eg13AdzcuSnZ3
20+
rZcSncm/gY5BHMmUJdMutTe+Scz32VW7yV8Mi+2ZwsMiqLksZMpBJqPyxroGTksI
21+
p7bklWf1pOgQqh9XJqu3x4rceH0o3xHZ/wana4RnSWL7Q4N6TNinAjLzlLvDByW7
22+
JX9ivpCVpM0n6tIT+E7UbWVX6WoICjCJeDLNwq/jYVEDP80O3yjDYKpYIOqp/e7Q
23+
/BJy
24+
-----END CERTIFICATE-----

0 commit comments

Comments
 (0)