Skip to content

Commit 1652cf5

Browse files
committed
Add support for multiple Uris
Part of #95 * Start by adding the bare minimum to support a list of Uris, and randomly selecting from that list when an Address is needed * Ensure all vhosts passed in a list of Uris match * Rework `ConnectionSettings` ctors to allow other settings when Uri or Uris are used. * Add `IUriSelector` * Use `ConnectionSettingsViaUris` when specifying a set of `Uri` instances to which to connect. * Wrap up equality comparison. * Rename to `ClusterConnectionSettings`
1 parent 181f5cb commit 1652cf5

File tree

9 files changed

+602
-109
lines changed

9 files changed

+602
-109
lines changed

RabbitMQ.AMQP.Client/ConnectionSettings.cs

Lines changed: 296 additions & 59 deletions
Large diffs are not rendered by default.

RabbitMQ.AMQP.Client/Consts.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,10 @@ public static class Consts
1616
/// <code>uint.MinValue</code> means "no limit"
1717
/// </summary>
1818
public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024
19+
20+
/// <summary>
21+
/// The default virtual host, <c>/</c>
22+
/// </summary>
23+
public const string DefaultVirtualHost = "/";
1924
}
2025
}

RabbitMQ.AMQP.Client/IEnvironment.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public interface IEnvironment
3434
/// Close all connections.
3535
/// </summary>
3636
/// <returns></returns>
37+
// TODO cancellation token
3738
Task CloseAsync();
3839
}
3940
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3434
private readonly AmqpManagement _management;
3535
private readonly RecordingTopologyListener _recordingTopologyListener = new();
3636

37-
internal readonly ConnectionSettings _connectionSettings;
37+
private readonly ConnectionSettings _connectionSettings;
3838
private readonly IMetricsReporter? _metricsReporter;
39+
40+
// TODO this is coupled with publishers and consumers
3941
internal readonly AmqpSessionManagement _nativePubSubSessions;
4042

4143
private readonly Dictionary<string, object> _connectionProperties = new();
@@ -350,23 +352,9 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)
350352

351353
try
352354
{
353-
ConnectionSettings connectionSettings;
354-
if (_connectionSettings is null)
355-
{
356-
// TODO create "internal bug" exception type?
357-
throw new InvalidOperationException(
358-
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
359-
}
360-
else
361-
{
362-
// TODO
363-
// There is absolutely NO POINT in having an interface if this
364-
// is what will be done!
365-
connectionSettings = (ConnectionSettings)_connectionSettings;
366-
Address address = connectionSettings.Address;
367-
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
368-
.ConfigureAwait(false);
369-
}
355+
Address address = _connectionSettings.Address;
356+
_nativeConnection = await cf.CreateAsync(address: address, open: open, onOpened: OnOpened)
357+
.ConfigureAwait(false);
370358
}
371359
catch (Exception ex)
372360
{

RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public Task<IConnection> CreateConnectionAsync()
6262
public ReadOnlyCollection<IConnection> GetConnections() =>
6363
new(_connections.Values.ToList());
6464

65-
public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
65+
// TODO cancellation token
66+
public Task CloseAsync()
67+
{
68+
return Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
69+
}
6670
}
6771
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Thre
33
abstract RabbitMQ.AMQP.Client.Impl.StreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
44
const RabbitMQ.AMQP.Client.Consts.Bindings = "bindings" -> string!
55
const RabbitMQ.AMQP.Client.Consts.DefaultMaxFrameSize = 0 -> uint
6+
const RabbitMQ.AMQP.Client.Consts.DefaultVirtualHost = "/" -> string!
67
const RabbitMQ.AMQP.Client.Consts.Exchanges = "exchanges" -> string!
78
const RabbitMQ.AMQP.Client.Consts.Key = "key" -> string!
89
const RabbitMQ.AMQP.Client.Consts.Messages = "messages" -> string!
910
const RabbitMQ.AMQP.Client.Consts.Queues = "queues" -> string!
1011
const RabbitMQ.AMQP.Client.MetricsReporter.MeterName = "RabbitMQ.Amqp" -> string!
1112
const RabbitMQ.AMQP.Client.MetricsReporter.MetricPrefix = "rabbitmq.amqp" -> string!
1213
override RabbitMQ.AMQP.Client.BackOffDelayPolicy.ToString() -> string!
14+
override RabbitMQ.AMQP.Client.ClusterConnectionSettings.Equals(object? obj) -> bool
15+
override RabbitMQ.AMQP.Client.ClusterConnectionSettings.GetHashCode() -> int
1316
override RabbitMQ.AMQP.Client.ConnectionSettings.Equals(object? obj) -> bool
1417
override RabbitMQ.AMQP.Client.ConnectionSettings.GetHashCode() -> int
1518
override RabbitMQ.AMQP.Client.ConnectionSettings.ToString() -> string!
@@ -53,12 +56,15 @@ RabbitMQ.AMQP.Client.ClassicQueueMode.Lazy = 1 -> RabbitMQ.AMQP.Client.ClassicQu
5356
RabbitMQ.AMQP.Client.ClassicQueueVersion
5457
RabbitMQ.AMQP.Client.ClassicQueueVersion.V1 = 0 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
5558
RabbitMQ.AMQP.Client.ClassicQueueVersion.V2 = 1 -> RabbitMQ.AMQP.Client.ClassicQueueVersion
59+
RabbitMQ.AMQP.Client.ClusterConnectionSettings
60+
RabbitMQ.AMQP.Client.ClusterConnectionSettings.ClusterConnectionSettings(System.Collections.Generic.IEnumerable<System.Uri!>! uris, RabbitMQ.AMQP.Client.IUriSelector? uriSelector = null, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
5661
RabbitMQ.AMQP.Client.ConnectionException
5762
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message) -> void
5863
RabbitMQ.AMQP.Client.ConnectionException.ConnectionException(string! message, System.Exception! innerException) -> void
5964
RabbitMQ.AMQP.Client.ConnectionSettings
60-
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! containerId, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
61-
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri) -> void
65+
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user = null, string? password = null, string? virtualHost = null, string! containerId = "", RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
66+
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
67+
RabbitMQ.AMQP.Client.ConnectionSettings.ConnectionSettings(System.Uri! uri, string? containerId = null, RabbitMQ.AMQP.Client.SaslMechanism? saslMechanism = null, RabbitMQ.AMQP.Client.IRecoveryConfiguration? recoveryConfiguration = null, uint? maxFrameSize = null, RabbitMQ.AMQP.Client.TlsSettings? tlsSettings = null) -> void
6268
RabbitMQ.AMQP.Client.ConnectionSettings.ContainerId.get -> string!
6369
RabbitMQ.AMQP.Client.ConnectionSettings.Host.get -> string!
6470
RabbitMQ.AMQP.Client.ConnectionSettings.MaxFrameSize.get -> uint
@@ -69,10 +75,11 @@ RabbitMQ.AMQP.Client.ConnectionSettings.Recovery.get -> RabbitMQ.AMQP.Client.IRe
6975
RabbitMQ.AMQP.Client.ConnectionSettings.SaslMechanism.get -> RabbitMQ.AMQP.Client.SaslMechanism!
7076
RabbitMQ.AMQP.Client.ConnectionSettings.Scheme.get -> string!
7177
RabbitMQ.AMQP.Client.ConnectionSettings.TlsSettings.get -> RabbitMQ.AMQP.Client.TlsSettings?
72-
RabbitMQ.AMQP.Client.ConnectionSettings.Uris.get -> System.Collections.Generic.IEnumerable<System.Uri!>?
7378
RabbitMQ.AMQP.Client.ConnectionSettings.User.get -> string?
7479
RabbitMQ.AMQP.Client.ConnectionSettings.UseSsl.get -> bool
7580
RabbitMQ.AMQP.Client.ConnectionSettings.VirtualHost.get -> string!
81+
RabbitMQ.AMQP.Client.ConnectionSettings._address -> Amqp.Address!
82+
RabbitMQ.AMQP.Client.ConnectionSettings._virtualHost -> string!
7683
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder
7784
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Build() -> RabbitMQ.AMQP.Client.ConnectionSettings!
7885
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.ConnectionSettingsBuilder() -> void
@@ -84,7 +91,10 @@ RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Port(int port) -> RabbitMQ.AMQP.C
8491
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8592
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.SaslMechanism(RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8693
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Scheme(string! scheme) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
94+
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.TlsSettings(RabbitMQ.AMQP.Client.TlsSettings! tlsSettings) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
95+
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uri(System.Uri! uri) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8796
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Uris(System.Collections.Generic.IEnumerable<System.Uri!>! uris) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
97+
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.UriSelector(RabbitMQ.AMQP.Client.IUriSelector! uriSelector) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8898
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.User(string! user) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
8999
RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.VirtualHost(string! virtualHost) -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
90100
RabbitMQ.AMQP.Client.Consts
@@ -668,6 +678,8 @@ RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterS
668678
RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
669679
RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
670680
RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
681+
RabbitMQ.AMQP.Client.IUriSelector
682+
RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
671683
RabbitMQ.AMQP.Client.LifeCycleCallBack
672684
RabbitMQ.AMQP.Client.MessageHandler
673685
RabbitMQ.AMQP.Client.MetricsReporter
@@ -711,6 +723,9 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType
711723
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
712724
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
713725
RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy
726+
RabbitMQ.AMQP.Client.RandomUriSelector
727+
RabbitMQ.AMQP.Client.RandomUriSelector.RandomUriSelector() -> void
728+
RabbitMQ.AMQP.Client.RandomUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
714729
RabbitMQ.AMQP.Client.RecoveryConfiguration
715730
RabbitMQ.AMQP.Client.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
716731
RabbitMQ.AMQP.Client.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
@@ -752,6 +767,8 @@ static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Cli
752767
static RabbitMQ.AMQP.Client.ByteCapacity.Kb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
753768
static RabbitMQ.AMQP.Client.ByteCapacity.Mb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
754769
static RabbitMQ.AMQP.Client.ByteCapacity.Tb(long terabytes) -> RabbitMQ.AMQP.Client.ByteCapacity!
770+
static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUriSegmentsForVirtualHost(System.Uri! uri) -> string!
771+
static RabbitMQ.AMQP.Client.ConnectionSettings.ProcessUserInfo(System.Uri! uri) -> (string? user, string? password)
755772
static RabbitMQ.AMQP.Client.ConnectionSettingsBuilder.Create() -> RabbitMQ.AMQP.Client.ConnectionSettingsBuilder!
756773
static RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper.AddressBuilder() -> RabbitMQ.AMQP.Client.Impl.AddressBuilder!
757774
static RabbitMQ.AMQP.Client.Impl.AmqpConnection.CreateAsync(RabbitMQ.AMQP.Client.ConnectionSettings! connectionSettings, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter = null) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!

Tests/ClusterTests.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections.Generic;
77
using System.Threading.Tasks;
88
using RabbitMQ.AMQP.Client;
9+
using RabbitMQ.AMQP.Client.Impl;
910
using Xunit;
1011
using Xunit.Abstractions;
1112

@@ -15,7 +16,7 @@ public class ClusterTests(ITestOutputHelper testOutputHelper)
1516
: IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
1617
{
1718
[SkippableFact]
18-
public Task CreateConnectionWithEnvironmentAndMultipleUris()
19+
public async Task CreateConnectionWithEnvironmentAndMultipleUris()
1920
{
2021
Skip.IfNot(IsCluster);
2122

@@ -31,16 +32,16 @@ public Task CreateConnectionWithEnvironmentAndMultipleUris()
3132
connectionSettingBuilder.Uris(uris);
3233
ConnectionSettings connectionSettings = connectionSettingBuilder.Build();
3334

34-
/*
35-
IEnvironment env = AmqpEnvironment.Create(ConnectionSettingBuilder.Create().Build());
36-
IConnection connection = await env.CreateConnectionAsync();
37-
Assert.NotNull(connection);
35+
IEnvironment env = AmqpEnvironment.Create(connectionSettings);
36+
37+
// Note: by using _connection, the test will dispose the object on teardown
38+
_connection = await env.CreateConnectionAsync();
39+
Assert.NotNull(_connection);
3840
Assert.NotEmpty(env.GetConnections());
41+
3942
await env.CloseAsync();
40-
Assert.Equal(State.Closed, connection.State);
41-
Assert.Empty(env.GetConnections());
42-
*/
4343

44-
return Task.CompletedTask;
44+
Assert.Equal(State.Closed, _connection.State);
45+
Assert.Empty(env.GetConnections());
4546
}
4647
}

0 commit comments

Comments
 (0)