Skip to content

Extend CreateConnection API #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
// Our list is in order of preference, the server one is not.
foreach (AuthMechanismFactory factory in AuthMechanisms)
{
if (mechanismNames.Any<string>(x => string.Equals(x, factory.Name, StringComparison.OrdinalIgnoreCase)))
var factoryName = factory.Name;
if (mechanismNames.Any<string>(x => string.Equals(x, factoryName, StringComparison.OrdinalIgnoreCase)))
{
return factory;
}
Expand All @@ -348,7 +349,7 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
/// </exception>
public virtual IConnection CreateConnection()
{
return CreateConnection(new List<string>() { HostName }, null);
return CreateConnection(new List<string> { HostName }, null);
}

/// <summary>
Expand All @@ -365,7 +366,7 @@ public virtual IConnection CreateConnection()
/// </exception>
public IConnection CreateConnection(String clientProvidedName)
{
return CreateConnection(new List<string>() { HostName }, clientProvidedName);
return CreateConnection(new List<string> { HostName }, clientProvidedName);
}

/// <summary>
Expand Down Expand Up @@ -407,20 +408,66 @@ public IConnection CreateConnection(IList<string> hostnames)
/// </exception>
public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
{
return CreateConnection(hostnames.Select(Endpoint.CloneWithHostname).ToList(), clientProvidedName);
}

/// <summary>
/// Create a connection using a list of hostnames. The first reachable
/// hostname will be used initially. Subsequent hostname picks are determined
/// by the <see cref="IHostnameSelector" /> configured.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
{
return CreateConnection(endpoints, null);
}

/// <summary>
/// Create a connection using a list of hostnames. The first reachable
/// hostname will be used initially. Subsequent hostname picks are determined
/// by the <see cref="IHostnameSelector" /> configured.
/// </summary>
/// <param name="endpoints">
/// List of endpoints to use for the initial
/// connection and recovery.
/// </param>
/// <param name="clientProvidedName">
/// Application-specific connection name, will be displayed in the management UI
/// if RabbitMQ server supports it. This value doesn't have to be unique and cannot
/// be used as a connection identifier, e.g. in HTTP API requests.
/// This value is supposed to be human-readable.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, String clientProvidedName)
{
var eps = endpoints.ToList();
IConnection conn;
try
{
if (AutomaticRecoveryEnabled)
{
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
autorecoveringConnection.Init(hostnames);
autorecoveringConnection.Init(eps);
conn = autorecoveringConnection;
}
else
{
IProtocol protocol = Protocols.DefaultProtocol;
var selectedHost = this.HostnameSelector.NextFrom(hostnames);
conn = protocol.CreateConnection(this, false, CreateFrameHandlerForHostname(selectedHost), clientProvidedName);
//We can't make this more elegant without changing the contract of the IHostnameSelector
//if there are endpoints with the same hostname but different ports the first match is selected
var selectedHost = HostnameSelector.NextFrom(eps.Select(ep => ep.HostName).ToList());
var selectedEndpoint = eps.First(ep => ep.HostName == selectedHost);
conn = protocol.CreateConnection(this, false, CreateFrameHandler(selectedEndpoint), clientProvidedName);
}
}
catch (Exception e)
Expand Down Expand Up @@ -524,7 +571,7 @@ that has at least the path segment "/". */
///<summary>
/// Unescape a string, protecting '+'.
/// </summary>
private string UriDecode(string uri)
private static string UriDecode(string uri)
{
return System.Uri.UnescapeDataString(uri.Replace("+", "%2B"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ public class AutorecoveringConnection : IConnection, IRecoverable
protected Connection m_delegate;
protected ConnectionFactory m_factory;

// list of hostnames provided on initial connection.
//retained for compatibility
protected IList<string> hostnames;
// list of endpoints provided on initial connection.
// on re-connection, the next host in the line is chosen using
// IHostnameSelector
protected IList<string> hostnames;
private IList<AmqpTcpEndpoint> endpoints;

public readonly object m_recordedEntitiesLock = new object();
protected readonly TaskFactory recoveryTaskFactory = new TaskFactory();
Expand Down Expand Up @@ -554,33 +556,45 @@ public void Init()

public void Init(IList<string> hostnames)
{
this.hostnames = hostnames;
string reachableHostname = null;
this.Init(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList());
}

public void Init(IList<AmqpTcpEndpoint> endpoints)
{
this.endpoints = endpoints;
this.hostnames = endpoints.Select(ep => ep.HostName).ToList();
AmqpTcpEndpoint reachableEndpoint = null;
IFrameHandler fh = null;
Exception e = null;
foreach (var h in hostnames)
foreach (var ep in endpoints)
{
try
{
fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(h));
reachableHostname = h;
fh = m_factory.CreateFrameHandler(ep);
reachableEndpoint = ep;
}
catch (Exception caught)
{
e = caught;
}
}
if (reachableHostname == null)
if (reachableEndpoint == null)
{
throw e;
}
this.Init(reachableHostname);

this.Init(reachableEndpoint);
}

protected void Init(string hostname)
{
this.Init(m_factory.Endpoint.CloneWithHostname(hostname));
}

private void Init(AmqpTcpEndpoint endpoint)
{
m_delegate = new Connection(m_factory, false,
m_factory.CreateFrameHandlerForHostname(hostname),
m_factory.CreateFrameHandler(endpoint),
this.ClientProvidedName);

AutorecoveringConnection self = this;
Expand Down Expand Up @@ -797,7 +811,8 @@ protected void RecoverConnectionDelegate()
try
{
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
var fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(nextHostname));
var endpoint = this.endpoints.First((e) => e.HostName == nextHostname);
var fh = m_factory.CreateFrameHandler(endpoint);
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
recovering = false;
}
Expand Down
11 changes: 11 additions & 0 deletions projects/client/Unit/src/unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ protected AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan inter
return (AutorecoveringConnection)cf.CreateConnection(hostnames);
}

protected AutorecoveringConnection CreateAutorecoveringConnection(IList<AmqpTcpEndpoint> endpoints)
{
var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = true;
// tests that use this helper will likely list unreachable hosts,
// make sure we time out quickly on those
cf.RequestedConnectionTimeout = 1000;
cf.NetworkRecoveryInterval = RECOVERY_INTERVAL;
return (AutorecoveringConnection)cf.CreateConnection(endpoints);
}

protected AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryDisabled()
{
var cf = new ConnectionFactory();
Expand Down
40 changes: 40 additions & 0 deletions projects/client/Unit/src/unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,45 @@ public void TestCreateConnectionWithoutAutoRecoverySelectsAHostFromTheList()
Assert.AreEqual("not_localhost", cf.HostName);
Assert.AreEqual("localhost", conn.Endpoint.HostName);
}

[Test]
public void TestCreateConnectionWithAutoRecoveryUsesAmqpTcpEndpoint()
{
var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = true;
cf.HostName = "not_localhost";
cf.Port = 1234 ;
var ep = new AmqpTcpEndpoint("localhost");
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
}

[Test]
[ExpectedException(typeof(BrokerUnreachableException))]
public void TestCreateConnectionWithAutoRecoveryUsesInvalidAmqpTcpEndpoint()
{
var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = true;
var ep = new AmqpTcpEndpoint("localhost", 1234);
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
}

[Test]
public void TestCreateConnectionUsesAmqpTcpEndpoint()
{
var cf = new ConnectionFactory();
cf.HostName = "not_localhost";
cf.Port = 1234 ;
var ep = new AmqpTcpEndpoint("localhost");
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
}

[Test]
[ExpectedException(typeof(BrokerUnreachableException))]
public void TestCreateConnectionUsesInvalidAmqpTcpEndpoint()
{
var cf = new ConnectionFactory();
var ep = new AmqpTcpEndpoint("localhost", 1234);
using(var conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep }));
}
}
}
35 changes: 33 additions & 2 deletions projects/client/Unit/src/unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void TestBasicConnectionRecovery()
[Test]
public void TestBasicConnectionRecoveryWithHostnameList()
{
var c = CreateAutorecoveringConnection(new List<string>() { "127.0.0.1", "localhost" });
var c = CreateAutorecoveringConnection(new List<string> { "127.0.0.1", "localhost" });
Assert.IsTrue(c.IsOpen);
CloseAndWaitForRecovery(c);
Assert.IsTrue(c.IsOpen);
Expand All @@ -107,7 +107,38 @@ public void TestBasicConnectionRecoveryWithHostnameList()
[Test]
public void TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts()
{
var c = CreateAutorecoveringConnection(new List<string>() { "191.72.44.22", "127.0.0.1", "localhost" });
var c = CreateAutorecoveringConnection(new List<string> { "191.72.44.22", "127.0.0.1", "localhost" });
Assert.IsTrue(c.IsOpen);
CloseAndWaitForRecovery(c);
Assert.IsTrue(c.IsOpen);
c.Close();
}

[Test]
public void TestBasicConnectionRecoveryWithEndpointList()
{
var c = CreateAutorecoveringConnection(
new List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint("127.0.0.1"),
new AmqpTcpEndpoint("localhost")
});
Assert.IsTrue(c.IsOpen);
CloseAndWaitForRecovery(c);
Assert.IsTrue(c.IsOpen);
c.Close();
}

[Test]
public void TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts()
{
var c = CreateAutorecoveringConnection(
new List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint("191.72.44.22"),
new AmqpTcpEndpoint("127.0.0.1"),
new AmqpTcpEndpoint("localhost")
});
Assert.IsTrue(c.IsOpen);
CloseAndWaitForRecovery(c);
Assert.IsTrue(c.IsOpen);
Expand Down