Skip to content

Commit ebce43d

Browse files
committed
* Add IUriSelector
1 parent 81c9a3a commit ebce43d

File tree

3 files changed

+74
-18
lines changed

3 files changed

+74
-18
lines changed

RabbitMQ.AMQP.Client/ConnectionSettings.cs

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@
1212

1313
namespace RabbitMQ.AMQP.Client
1414
{
15+
public interface IUriSelector
16+
{
17+
Uri Select(ICollection<Uri> uris);
18+
}
19+
20+
public class RandomUriSelector : IUriSelector
21+
{
22+
public Uri Select(ICollection<Uri> uris)
23+
{
24+
return uris.Skip(Utils.RandomNext(0, uris.Count)).First();
25+
}
26+
}
27+
1528
public class ConnectionSettingsBuilder
1629
{
1730
private string _host = "localhost";
@@ -27,6 +40,7 @@ public class ConnectionSettingsBuilder
2740
private TlsSettings? _tlsSettings = null;
2841
private Uri? _uri;
2942
private List<Uri>? _uris;
43+
private IUriSelector? _uriSelector;
3044

3145
public static ConnectionSettingsBuilder Create()
3246
{
@@ -125,10 +139,16 @@ public ConnectionSettingsBuilder Uris(IEnumerable<Uri> uris)
125139
return this;
126140
}
127141

142+
public ConnectionSettingsBuilder UriSelector(IUriSelector uriSelector)
143+
{
144+
_uriSelector = uriSelector;
145+
return this;
146+
}
147+
128148
public ConnectionSettings Build()
129149
{
130-
ValidateUris();
131150
// TODO this should do something similar to consolidate in the Java code
151+
ValidateUris();
132152
if (_uri is not null)
133153
{
134154
return new ConnectionSettings(_uri,
@@ -140,6 +160,7 @@ public ConnectionSettings Build()
140160
else if (_uris is not null)
141161
{
142162
return new ConnectionSettings(_uris,
163+
_uriSelector,
143164
_containerId, _saslMechanism,
144165
_recoveryConfiguration,
145166
_maxFrameSize,
@@ -171,7 +192,9 @@ private void ValidateUris()
171192
public class ConnectionSettings : IEquatable<ConnectionSettings>
172193
{
173194
private readonly Address _address = new("amqp://localhost:5672");
174-
private readonly List<Address> _addresses = new();
195+
private readonly List<Uri>? _uris;
196+
private readonly Dictionary<Uri, Address>? _uriToAddress;
197+
private readonly IUriSelector _uriSelector = new RandomUriSelector();
175198
private readonly string _virtualHost = Consts.DefaultVirtualHost;
176199
private readonly string _containerId = string.Empty;
177200
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
@@ -197,7 +220,6 @@ public ConnectionSettings(Uri uri,
197220
password: password,
198221
path: "/",
199222
scheme: uri.Scheme);
200-
_addresses.Add(_address);
201223

202224
if (_address.UseSsl && _tlsSettings == null)
203225
{
@@ -206,16 +228,27 @@ public ConnectionSettings(Uri uri,
206228
}
207229

208230
public ConnectionSettings(IEnumerable<Uri> uris,
231+
IUriSelector? uriSelector = null,
209232
string? containerId = null,
210233
SaslMechanism? saslMechanism = null,
211234
IRecoveryConfiguration? recoveryConfiguration = null,
212235
uint? maxFrameSize = null,
213236
TlsSettings? tlsSettings = null)
214237
: this(containerId, saslMechanism, recoveryConfiguration, maxFrameSize, tlsSettings)
215238
{
239+
_uris = uris.ToList();
240+
241+
if (uriSelector is not null)
242+
{
243+
_uriSelector = uriSelector;
244+
}
245+
246+
_uriToAddress = new(_uris.Count);
247+
216248
string? tmpVirtualHost = null;
217249

218-
foreach (Uri uri in uris)
250+
bool first = true;
251+
foreach (Uri uri in _uris)
219252
{
220253
(string? user, string? password) = ProcessUserInfo(uri);
221254

@@ -238,10 +271,15 @@ public ConnectionSettings(IEnumerable<Uri> uris,
238271
password: password,
239272
path: "/",
240273
scheme: uri.Scheme);
241-
_addresses.Add(address);
242-
}
243274

244-
_address = _addresses[0];
275+
_uriToAddress[uri] = address;
276+
277+
if (first)
278+
{
279+
_address = address;
280+
first = false;
281+
}
282+
}
245283

246284
if (tmpVirtualHost is not null)
247285
{
@@ -265,7 +303,6 @@ public ConnectionSettings(string scheme,
265303
_address = new Address(host: host, port: port,
266304
user: user, password: password,
267305
path: "/", scheme: scheme);
268-
_addresses.Add(_address);
269306

270307
if (virtualHost is not null)
271308
{
@@ -331,23 +368,34 @@ internal Address Address
331368
{
332369
get
333370
{
334-
if (_addresses.Count == 1)
371+
if (_uris is not null &&
372+
_uriSelector is not null &&
373+
_uriToAddress is not null)
335374
{
336-
if (false == Object.ReferenceEquals(_address, _addresses[0]))
337-
{
338-
InternalBugException.CreateAndThrow("_address must be same object as _addresses[0]");
339-
}
340-
return _address;
375+
Uri uri = _uriSelector.Select(_uris);
376+
return _uriToAddress[uri];
341377
}
342378
else
343379
{
344-
// Note: the max value for RandomNext is not inclusive
345-
return _addresses[Utils.RandomNext(0, _addresses.Count)];
380+
return _address;
346381
}
347382
}
348383
}
349384

350-
internal List<Address> Addresses => _addresses;
385+
internal List<Address>? Addresses
386+
{
387+
get
388+
{
389+
if (_uriToAddress is not null)
390+
{
391+
return _uriToAddress.Values.ToList();
392+
}
393+
else
394+
{
395+
return null;
396+
}
397+
}
398+
}
351399

352400
public override string ToString()
353401
{

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) ->
5959
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void
6060
RabbitMQ.AMQP.Client.ConnectionSettings
6161
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
62-
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Collections.Generic.IEnumerable<System.Uri!>! uris, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
62+
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Collections.Generic.IEnumerable<System.Uri!>! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
6363
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
6464
RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string!
6565
RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string!
@@ -88,6 +88,7 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Scheme(string! scheme) -> RabbitM
8888
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.TlsSettings(RabbitMQ.AMQP.Client.TlsSettings! tlsSettings) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8989
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uri(System.Uri! uri) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
9090
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uris(System.Collections.Generic.IEnumerable<System.Uri!>! uris) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
91+
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.UriSelector(RabbitMQ.AMQP.Client.IUriSelector! uriSelector) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
9192
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.User(string! user) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
9293
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.VirtualHost(string! virtualHost) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
9394
RabbitMQ.AMQP.Client.Consts
@@ -671,6 +672,8 @@ RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterS
671672
RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
672673
RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
673674
RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
675+
RabbitMQ.AMQP.Client.IUriSelector
676+
RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
674677
RabbitMQ.AMQP.Client.LifeCycleCallBack
675678
RabbitMQ.AMQP.Client.MessageHandler
676679
RabbitMQ.AMQP.Client.MetricsReporter
@@ -714,6 +717,9 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType
714717
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
715718
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
716719
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
720+
RabbitMQ.AMQP.Client.RandomUriSelector
721+
RabbitMQ.AMQP.Client.RandomUriSelector.RandomUriSelector() -> void
722+
RabbitMQ.AMQP.Client.RandomUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
717723
RabbitMQ.AMQP.Client.RecoveryConfiguration
718724
RabbitMQ.AMQP.Client.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
719725
RabbitMQ.AMQP.Client.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!

Tests/ConnectionTests/ConnectionSettingsTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public void ConnectionSettingsViaUris()
131131
Assert.Equal(vhost, connectionSettings.VirtualHost);
132132
Assert.Equal(scheme, connectionSettings.Scheme);
133133

134+
Assert.NotNull(connectionSettings.Addresses);
134135
Amqp.Address a0 = connectionSettings.Addresses[0];
135136
Assert.Equal(host, a0.Host);
136137
Assert.Equal(5671, a0.Port);
@@ -196,6 +197,7 @@ public void ConnectionSettingsViaBuilderWithUris()
196197
Assert.Equal(vhost, connectionSettings.VirtualHost);
197198
Assert.Equal(scheme, connectionSettings.Scheme);
198199

200+
Assert.NotNull(connectionSettings.Addresses);
199201
Amqp.Address a0 = connectionSettings.Addresses[0];
200202
Assert.Equal(host, a0.Host);
201203
Assert.Equal(5671, a0.Port);

0 commit comments

Comments
 (0)