Skip to content

Commit 8810a94

Browse files
committed
Extend CreateConnection api with an overload taking a list of AmqpTcpEndpoints to
enable the use of non-default ports with multiple target hosts.
1 parent 51fcf7b commit 8810a94

File tree

5 files changed

+164
-20
lines changed

5 files changed

+164
-20
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
332332
// Our list is in order of preference, the server one is not.
333333
foreach (AuthMechanismFactory factory in AuthMechanisms)
334334
{
335-
if (mechanismNames.Any<string>(x => string.Equals(x, factory.Name, StringComparison.OrdinalIgnoreCase)))
335+
var factoryName = factory.Name;
336+
if (mechanismNames.Any<string>(x => string.Equals(x, factoryName, StringComparison.OrdinalIgnoreCase)))
336337
{
337338
return factory;
338339
}
@@ -348,7 +349,7 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
348349
/// </exception>
349350
public virtual IConnection CreateConnection()
350351
{
351-
return CreateConnection(new List<string>() { HostName }, null);
352+
return CreateConnection(new List<string> { HostName }, null);
352353
}
353354

354355
/// <summary>
@@ -365,7 +366,7 @@ public virtual IConnection CreateConnection()
365366
/// </exception>
366367
public IConnection CreateConnection(String clientProvidedName)
367368
{
368-
return CreateConnection(new List<string>() { HostName }, clientProvidedName);
369+
return CreateConnection(new List<string> { HostName }, clientProvidedName);
369370
}
370371

371372
/// <summary>
@@ -407,20 +408,66 @@ public IConnection CreateConnection(IList<string> hostnames)
407408
/// </exception>
408409
public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
409410
{
411+
return CreateConnection(hostnames.Select(Endpoint.CloneWithHostname).ToList(), clientProvidedName);
412+
}
413+
414+
/// <summary>
415+
/// Create a connection using a list of hostnames. The first reachable
416+
/// hostname will be used initially. Subsequent hostname picks are determined
417+
/// by the <see cref="IHostnameSelector" /> configured.
418+
/// </summary>
419+
/// <param name="endpoints">
420+
/// List of endpoints to use for the initial
421+
/// connection and recovery.
422+
/// </param>
423+
/// <returns>Open connection</returns>
424+
/// <exception cref="BrokerUnreachableException">
425+
/// When no hostname was reachable.
426+
/// </exception>
427+
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
428+
{
429+
return CreateConnection(endpoints, null);
430+
}
431+
432+
/// <summary>
433+
/// Create a connection using a list of hostnames. The first reachable
434+
/// hostname will be used initially. Subsequent hostname picks are determined
435+
/// by the <see cref="IHostnameSelector" /> configured.
436+
/// </summary>
437+
/// <param name="endpoints">
438+
/// List of endpoints to use for the initial
439+
/// connection and recovery.
440+
/// </param>
441+
/// <param name="clientProvidedName">
442+
/// Application-specific connection name, will be displayed in the management UI
443+
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
444+
/// be used as a connection identifier, e.g. in HTTP API requests.
445+
/// This value is supposed to be human-readable.
446+
/// </param>
447+
/// <returns>Open connection</returns>
448+
/// <exception cref="BrokerUnreachableException">
449+
/// When no hostname was reachable.
450+
/// </exception>
451+
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, String clientProvidedName)
452+
{
453+
var eps = endpoints.ToList();
410454
IConnection conn;
411455
try
412456
{
413457
if (AutomaticRecoveryEnabled)
414458
{
415459
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
416-
autorecoveringConnection.Init(hostnames);
460+
autorecoveringConnection.Init(eps);
417461
conn = autorecoveringConnection;
418462
}
419463
else
420464
{
421465
IProtocol protocol = Protocols.DefaultProtocol;
422-
var selectedHost = this.HostnameSelector.NextFrom(hostnames);
423-
conn = protocol.CreateConnection(this, false, CreateFrameHandlerForHostname(selectedHost), clientProvidedName);
466+
//We can't make this more elegant without changing the contract of the IHostnameSelector
467+
//if there are endpoints with the same hostname but different ports the first match is selected
468+
var selectedHost = HostnameSelector.NextFrom(eps.Select(ep => ep.HostName).ToList());
469+
var selectedEndpoint = eps.First(ep => ep.HostName == selectedHost);
470+
conn = protocol.CreateConnection(this, false, CreateFrameHandler(selectedEndpoint), clientProvidedName);
424471
}
425472
}
426473
catch (Exception e)
@@ -524,7 +571,7 @@ that has at least the path segment "/". */
524571
///<summary>
525572
/// Unescape a string, protecting '+'.
526573
/// </summary>
527-
private string UriDecode(string uri)
574+
private static string UriDecode(string uri)
528575
{
529576
return System.Uri.UnescapeDataString(uri.Replace("+", "%2B"));
530577
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,12 @@ public class AutorecoveringConnection : IConnection, IRecoverable
5656
protected Connection m_delegate;
5757
protected ConnectionFactory m_factory;
5858

59-
// list of hostnames provided on initial connection.
59+
//retained for compatibility
60+
protected IList<string> hostnames;
61+
// list of endpoints provided on initial connection.
6062
// on re-connection, the next host in the line is chosen using
6163
// IHostnameSelector
62-
protected IList<string> hostnames;
64+
private IList<AmqpTcpEndpoint> endpoints;
6365

6466
public readonly object m_recordedEntitiesLock = new object();
6567
protected readonly TaskFactory recoveryTaskFactory = new TaskFactory();
@@ -554,33 +556,45 @@ public void Init()
554556

555557
public void Init(IList<string> hostnames)
556558
{
557-
this.hostnames = hostnames;
558-
string reachableHostname = null;
559+
this.Init(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList());
560+
}
561+
562+
public void Init(IList<AmqpTcpEndpoint> endpoints)
563+
{
564+
this.endpoints = endpoints;
565+
this.hostnames = endpoints.Select(ep => ep.HostName).ToList();
566+
AmqpTcpEndpoint reachableEndpoint = null;
559567
IFrameHandler fh = null;
560568
Exception e = null;
561-
foreach (var h in hostnames)
569+
foreach (var ep in endpoints)
562570
{
563571
try
564572
{
565-
fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(h));
566-
reachableHostname = h;
573+
fh = m_factory.CreateFrameHandler(ep);
574+
reachableEndpoint = ep;
567575
}
568576
catch (Exception caught)
569577
{
570578
e = caught;
571579
}
572580
}
573-
if (reachableHostname == null)
581+
if (reachableEndpoint == null)
574582
{
575583
throw e;
576584
}
577-
this.Init(reachableHostname);
585+
586+
this.Init(reachableEndpoint);
578587
}
579588

580589
protected void Init(string hostname)
590+
{
591+
this.Init(m_factory.Endpoint.CloneWithHostname(hostname));
592+
}
593+
594+
private void Init(AmqpTcpEndpoint endpoint)
581595
{
582596
m_delegate = new Connection(m_factory, false,
583-
m_factory.CreateFrameHandlerForHostname(hostname),
597+
m_factory.CreateFrameHandler(endpoint),
584598
this.ClientProvidedName);
585599

586600
AutorecoveringConnection self = this;
@@ -797,7 +811,8 @@ protected void RecoverConnectionDelegate()
797811
try
798812
{
799813
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
800-
var fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(nextHostname));
814+
var endpoint = this.endpoints.First((e) => e.HostName == nextHostname);
815+
var fh = m_factory.CreateFrameHandler(endpoint);
801816
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
802817
recovering = false;
803818
}

projects/client/Unit/src/unit/Fixtures.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ protected AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan inter
126126
return (AutorecoveringConnection)cf.CreateConnection(hostnames);
127127
}
128128

129+
protected AutorecoveringConnection CreateAutorecoveringConnection(IList<AmqpTcpEndpoint> endpoints)
130+
{
131+
var cf = new ConnectionFactory();
132+
cf.AutomaticRecoveryEnabled = true;
133+
// tests that use this helper will likely list unreachable hosts,
134+
// make sure we time out quickly on those
135+
cf.RequestedConnectionTimeout = 1000;
136+
cf.NetworkRecoveryInterval = RECOVERY_INTERVAL;
137+
return (AutorecoveringConnection)cf.CreateConnection(endpoints);
138+
}
139+
129140
protected AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryDisabled()
130141
{
131142
var cf = new ConnectionFactory();

projects/client/Unit/src/unit/TestConnectionFactory.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,45 @@ public void TestCreateConnectionWithoutAutoRecoverySelectsAHostFromTheList()
136136
Assert.AreEqual("not_localhost", cf.HostName);
137137
Assert.AreEqual("localhost", conn.Endpoint.HostName);
138138
}
139+
140+
[Test]
141+
public void TestCreateConnectionWithAutoRecoveryUsesAmqpTcpEndpoint()
142+
{
143+
var cf = new ConnectionFactory();
144+
cf.AutomaticRecoveryEnabled = true;
145+
cf.HostName = "not_localhost";
146+
cf.Port = 1234 ;
147+
var ep = new AmqpTcpEndpoint("localhost");
148+
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
149+
}
150+
151+
[Test]
152+
[ExpectedException(typeof(BrokerUnreachableException))]
153+
public void TestCreateConnectionWithAutoRecoveryUsesInvalidAmqpTcpEndpoint()
154+
{
155+
var cf = new ConnectionFactory();
156+
cf.AutomaticRecoveryEnabled = true;
157+
var ep = new AmqpTcpEndpoint("localhost", 1234);
158+
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
159+
}
160+
161+
[Test]
162+
public void TestCreateConnectionUsesAmqpTcpEndpoint()
163+
{
164+
var cf = new ConnectionFactory();
165+
cf.HostName = "not_localhost";
166+
cf.Port = 1234 ;
167+
var ep = new AmqpTcpEndpoint("localhost");
168+
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
169+
}
170+
171+
[Test]
172+
[ExpectedException(typeof(BrokerUnreachableException))]
173+
public void TestCreateConnectionUsesInvalidAmqpTcpEndpoint()
174+
{
175+
var cf = new ConnectionFactory();
176+
var ep = new AmqpTcpEndpoint("localhost", 1234);
177+
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
178+
}
139179
}
140180
}

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void TestBasicConnectionRecovery()
9797
[Test]
9898
public void TestBasicConnectionRecoveryWithHostnameList()
9999
{
100-
var c = CreateAutorecoveringConnection(new List<string>() { "127.0.0.1", "localhost" });
100+
var c = CreateAutorecoveringConnection(new List<string> { "127.0.0.1", "localhost" });
101101
Assert.IsTrue(c.IsOpen);
102102
CloseAndWaitForRecovery(c);
103103
Assert.IsTrue(c.IsOpen);
@@ -107,7 +107,38 @@ public void TestBasicConnectionRecoveryWithHostnameList()
107107
[Test]
108108
public void TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts()
109109
{
110-
var c = CreateAutorecoveringConnection(new List<string>() { "191.72.44.22", "127.0.0.1", "localhost" });
110+
var c = CreateAutorecoveringConnection(new List<string> { "191.72.44.22", "127.0.0.1", "localhost" });
111+
Assert.IsTrue(c.IsOpen);
112+
CloseAndWaitForRecovery(c);
113+
Assert.IsTrue(c.IsOpen);
114+
c.Close();
115+
}
116+
117+
[Test]
118+
public void TestBasicConnectionRecoveryWithEndpointList()
119+
{
120+
var c = CreateAutorecoveringConnection(
121+
new List<AmqpTcpEndpoint>
122+
{
123+
new AmqpTcpEndpoint("127.0.0.1"),
124+
new AmqpTcpEndpoint("localhost")
125+
});
126+
Assert.IsTrue(c.IsOpen);
127+
CloseAndWaitForRecovery(c);
128+
Assert.IsTrue(c.IsOpen);
129+
c.Close();
130+
}
131+
132+
[Test]
133+
public void TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts()
134+
{
135+
var c = CreateAutorecoveringConnection(
136+
new List<AmqpTcpEndpoint>
137+
{
138+
new AmqpTcpEndpoint("191.72.44.22"),
139+
new AmqpTcpEndpoint("127.0.0.1"),
140+
new AmqpTcpEndpoint("localhost")
141+
});
111142
Assert.IsTrue(c.IsOpen);
112143
CloseAndWaitForRecovery(c);
113144
Assert.IsTrue(c.IsOpen);

0 commit comments

Comments
 (0)