Skip to content

Commit 56eae0e

Browse files
Gsantomaggiolukebakken
authored andcommitted
Implement the Environment class to manage the connections (#36)
* Implement the Environment Closes #35 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 2db5a00 commit 56eae0e

File tree

13 files changed

+259
-37
lines changed

13 files changed

+259
-37
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
2929
- [x] Recovery queues on connection lost
3030
- [x] Recovery publishers on connection lost
3131
- [x] Recovery consumers on connection lost
32+
- [x] Implement Environment to manage the connections
3233
- [ ] Complete the consumer part with `pause` and `unpause`
3334
- [ ] Complete the binding/unbinding with the special characters
3435
- [ ] Complete the queues/exchanges name with the special characters

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ public interface IConnection : ILifeCycle
2222
IConsumerBuilder ConsumerBuilder();
2323

2424
public ReadOnlyCollection<IPublisher> GetPublishers();
25+
26+
public ReadOnlyCollection<IConsumer> GetConsumers();
27+
28+
public long Id { get; set; }
2529
}

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1818
uint MaxFrameSize { get; }
1919
SaslMechanism SaslMechanism { get; }
2020
ITlsSettings? TlsSettings { get; }
21+
22+
IRecoveryConfiguration Recovery { get; }
23+
2124
}
2225

2326
/// <summary>

RabbitMQ.AMQP.Client/IEnvironment.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Collections.ObjectModel;
2+
3+
namespace RabbitMQ.AMQP.Client;
4+
5+
6+
/// <summary>
7+
/// Interface to create IConnections and manage them.
8+
/// </summary>
9+
public interface IEnvironment
10+
{
11+
/// <summary>
12+
/// Create a new connection with the given connection settings.
13+
/// </summary>
14+
/// <param name="connectionSettings"></param>
15+
/// <returns>IConnection</returns>
16+
public Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings);
17+
18+
19+
/// <summary>
20+
/// Create a new connection with the default connection settings.
21+
/// </summary>
22+
/// <returns>IConnection</returns>
23+
24+
public Task<IConnection> CreateConnectionAsync();
25+
26+
27+
public ReadOnlyCollection<IConnection> GetConnections();
28+
29+
/// <summary>
30+
/// Close all connections.
31+
/// </summary>
32+
/// <returns></returns>
33+
34+
Task CloseAsync();
35+
}

RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public interface IRecoveryConfiguration
2424
/// <returns></returns>
2525
IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);
2626

27+
IBackOffDelayPolicy GetBackOffDelayPolicy();
28+
2729
/// <summary>
2830
/// Define if the recovery of the topology is activated.
2931
/// When Activated the connection will try to recover the topology after a reconnection.

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private async Task ReconnectConsumers()
9595
}
9696
}
9797

98-
private readonly ConnectionSettings _connectionSettings;
98+
private readonly IConnectionSettings _connectionSettings;
9999
internal readonly AmqpSessionManagement _nativePubSubSessions;
100100

101101
// TODO: Implement the semaphore to avoid multiple connections
@@ -116,6 +116,13 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
116116
return Publishers.Values.ToList().AsReadOnly();
117117
}
118118

119+
public ReadOnlyCollection<IConsumer> GetConsumers()
120+
{
121+
return Consumers.Values.ToList().AsReadOnly();
122+
}
123+
124+
public long Id { get; set; }
125+
119126
/// <summary>
120127
/// Creates a new instance of <see cref="AmqpConnection"/>
121128
/// Through the Connection is possible to create:
@@ -124,7 +131,7 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
124131
/// </summary>
125132
/// <param name="connectionSettings"></param>
126133
/// <returns></returns>
127-
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
134+
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
128135
{
129136
var connection = new AmqpConnection(connectionSettings);
130137
await connection.OpenAsync()
@@ -158,7 +165,7 @@ await consumer.CloseAsync()
158165
}
159166
}
160167

161-
private AmqpConnection(ConnectionSettings connectionSettings)
168+
private AmqpConnection(IConnectionSettings connectionSettings)
162169
{
163170
_connectionSettings = connectionSettings;
164171
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
@@ -198,10 +205,7 @@ private async Task EnsureConnection()
198205
var open = new Open
199206
{
200207
HostName = $"vhost:{_connectionSettings.VirtualHost}",
201-
Properties = new Fields()
202-
{
203-
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
204-
}
208+
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
205209
};
206210

207211
if (_connectionSettings.MaxFrameSize > uint.MinValue)
@@ -230,12 +234,14 @@ void onOpened(Amqp.IConnection connection, Open open1)
230234

231235
if (_connectionSettings.TlsSettings.LocalCertificateSelectionCallback is not null)
232236
{
233-
cf.SSL.LocalCertificateSelectionCallback = _connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
237+
cf.SSL.LocalCertificateSelectionCallback =
238+
_connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
234239
}
235240

236241
if (_connectionSettings.TlsSettings.RemoteCertificateValidationCallback is not null)
237242
{
238-
cf.SSL.RemoteCertificateValidationCallback = _connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
243+
cf.SSL.RemoteCertificateValidationCallback =
244+
_connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
239245
}
240246
}
241247

@@ -246,7 +252,7 @@ void onOpened(Amqp.IConnection connection, Open open1)
246252

247253
try
248254
{
249-
_nativeConnection = await cf.CreateAsync(_connectionSettings.Address, open: open, onOpened: onOpened)
255+
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
250256
.ConfigureAwait(false);
251257
}
252258
catch (Exception ex)
@@ -300,7 +306,7 @@ private ClosedCallback MaybeRecoverConnection()
300306
// we have to check if the recovery is active.
301307
// The user may want to disable the recovery mechanism
302308
// the user can use the lifecycle callback to handle the error
303-
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
309+
if (!_connectionSettings.Recovery.IsActivate())
304310
{
305311
OnNewStatus(State.Closed, Utils.ConvertError(error));
306312
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
@@ -323,19 +329,19 @@ await Task.Run(async () =>
323329
// the user may want to disable the backoff policy or
324330
// the backoff policy is not active due of some condition
325331
// for example: Reaching the maximum number of retries and avoid the forever loop
326-
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive() &&
332+
_connectionSettings.Recovery.GetBackOffDelayPolicy().IsActive() &&
327333

328334
// even we set the status to reconnecting up, we need to check if the connection is still in the
329335
// reconnecting status. The user may close the connection in the meanwhile
330336
State == State.Reconnecting)
331337
{
332338
try
333339
{
334-
int next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
340+
int next = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
335341

336342
Trace.WriteLine(TraceLevel.Information,
337343
$"Trying Recovering connection in {next} milliseconds, " +
338-
$"attempt: {_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().CurrentAttempt}. " +
344+
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
339345
$"Info: {ToString()})");
340346

341347
await Task.Delay(TimeSpan.FromMilliseconds(next))
@@ -352,7 +358,7 @@ await EnsureConnection()
352358
}
353359
}
354360

355-
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
361+
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
356362
string connectionDescription = connected ? "recovered" : "not recovered";
357363
Trace.WriteLine(TraceLevel.Information,
358364
$"Connection {connectionDescription}. Info: {ToString()}");
@@ -362,15 +368,15 @@ await EnsureConnection()
362368
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
363369
OnNewStatus(State.Closed,
364370
new Error(ConnectionNotRecoveredCode,
365-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
371+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
366372

367373
ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
368-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
374+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
369375

370376
return;
371377
}
372378

373-
if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
379+
if (_connectionSettings.Recovery.IsTopologyActive())
374380
{
375381
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
376382
var visitor = new Visitor(_management);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System.Collections.Concurrent;
2+
using System.Collections.ObjectModel;
3+
4+
namespace RabbitMQ.AMQP.Client.Impl;
5+
6+
public class AmqpEnvironment : IEnvironment
7+
{
8+
private IConnectionSettings? ConnectionSettings { get; }
9+
private long _sequentialId = 0;
10+
private readonly ConcurrentDictionary<long, IConnection> _connections = [];
11+
12+
private AmqpEnvironment(IConnectionSettings connectionSettings)
13+
{
14+
ConnectionSettings = connectionSettings;
15+
}
16+
17+
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
18+
{
19+
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
20+
}
21+
22+
public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
23+
{
24+
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
25+
c.Id = Interlocked.Increment(ref _sequentialId);
26+
_connections.TryAdd(c.Id, c);
27+
c.ChangeState += (sender, previousState, currentState, failureCause) =>
28+
{
29+
if (currentState != State.Closed)
30+
{
31+
return;
32+
}
33+
34+
if (sender is IConnection connection)
35+
{
36+
_connections.TryRemove(connection.Id, out _);
37+
}
38+
};
39+
return c;
40+
}
41+
42+
public async Task<IConnection> CreateConnectionAsync()
43+
{
44+
if (ConnectionSettings != null)
45+
{
46+
return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false);
47+
}
48+
49+
throw new ConnectionException("Connection settings are not set");
50+
}
51+
52+
public ReadOnlyCollection<IConnection> GetConnections() =>
53+
new(_connections.Values.ToList());
54+
55+
public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
56+
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public ConnectionSettings Build()
101101
_password, _virtualHost,
102102
_connectionName, _saslMechanism, _maxFrameSize)
103103
{
104-
RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration
104+
Recovery = (RecoveryConfiguration)_recoveryConfiguration
105105
};
106106

107107
return c;
@@ -172,6 +172,8 @@ public ConnectionSettings(string scheme, string host, int port,
172172
public uint MaxFrameSize => _maxFrameSize;
173173
public SaslMechanism SaslMechanism => _saslMechanism;
174174
public ITlsSettings? TlsSettings => _tlsSettings;
175+
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();
176+
175177

176178
public override string ToString()
177179
{
@@ -239,7 +241,7 @@ public bool Equals(IConnectionSettings? other)
239241

240242
internal Address Address => _address;
241243

242-
public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
244+
// public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
243245
}
244246

245247
/// <summary>

0 commit comments

Comments
 (0)