Skip to content

Remember initial routing address #326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
{
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,25 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
private final Rediscovery rediscovery;
private final Logger log;

public LoadBalancer( RoutingSettings settings, ConnectionPool connections, Clock clock, Logging logging,
BoltServerAddress... routingAddresses )
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
Clock clock, Logging logging )
{
this( settings, new ClusterRoutingTable( clock, routingAddresses ), connections, clock,
this( initialRouter, settings, connections, new ClusterRoutingTable( clock, initialRouter ), clock,
logging.getLog( LOAD_BALANCER_LOG_NAME ) );
}

private LoadBalancer( RoutingSettings settings, RoutingTable routingTable, ConnectionPool connections,
Clock clock, Logger log )
private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
RoutingTable routingTable, Clock clock, Logger log )
{
this( routingTable, connections, new Rediscovery( settings, clock, log,
new GetServersProcedureClusterCompositionProvider( clock, log ) ), log );
this( connections, routingTable, createRediscovery( initialRouter, settings, clock, log ), log );
}

LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
LoadBalancer( ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery, Logger log )
{
this.log = log;
this.connections = connections;
this.routingTable = routingTable;
this.rediscovery = rediscovery;
this.log = log;

// initialize the routing table
ensureRouting();
Expand Down Expand Up @@ -127,23 +126,17 @@ synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolEx
if ( routingTable.isStale() )
{
log.info( "Routing information is stale. %s", routingTable );
try
{
// get a new routing table
ClusterComposition cluster = rediscovery.lookupRoutingTable( connections, routingTable );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
connections.purge( address );
}

log.info( "Refreshed routing information. %s", routingTable );
}
catch ( InterruptedException e )
// get a new routing table
ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
throw new ServiceUnavailableException( "Thread was interrupted while establishing connection.", e );
connections.purge( address );
}

log.info( "Refreshed routing information. %s", routingTable );
}
}

Expand All @@ -159,4 +152,11 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
}
}

private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
Clock clock, Logger log )
{
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,25 @@
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SecurityException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;

public class Rediscovery
{
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available.";

private final BoltServerAddress initialRouter;
private final RoutingSettings settings;
private final Clock clock;
private final Logger logger;
private final ClusterCompositionProvider provider;

public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterCompositionProvider provider )
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
ClusterCompositionProvider provider )
{
this.initialRouter = initialRouter;
this.settings = settings;
this.clock = clock;
this.logger = logger;
Expand All @@ -47,68 +50,105 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste

// Given the current routing table and connection pool, use the connection composition provider to fetch a new
// cluster composition, which would be used to update the routing table and connection pool
public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable )
throws InterruptedException
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
{
assertHasRouters( routingTable );
int failures = 0;

for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
{
long waitTime = start + delay - clock.millis();
if ( waitTime > 0 )
{
clock.sleep( waitTime );
}
sleep( waitTime );
start = clock.millis();

int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable );
if ( composition != null )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
}

ClusterCompositionResponse response = null;
try ( Connection connection = connections.acquire( address ) )
{
response = provider.getClusterComposition( connection );
}
catch( SecurityException e )
{
throw e; // terminate the discovery immediately
}
catch ( Exception e )
{
// the connection breaks
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
routingTable.removeRouter( address );

assertHasRouters( routingTable );
continue;
}

ClusterComposition cluster = response.clusterComposition();
logger.info( "Got cluster composition %s", cluster );
if ( cluster.hasWriters() )
{
return cluster;
}
return composition;
}

if ( ++failures >= settings.maxRoutingFailures )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
}
}
}

private void assertHasRouters( RoutingTable table )
private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
RoutingTable routingTable )
{
if ( table.routerSize() == 0 )
boolean triedInitialRouter = false;
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
break;
}

if ( address.equals( initialRouter ) )
{
triedInitialRouter = true;
}

ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
if ( composition != null )
{
return composition;
}
}

if ( triedInitialRouter )
{
return null;
}
return lookupClusterCompositionOnRouter( initialRouter, connections, routingTable );
}

private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
ConnectionPool connections, RoutingTable routingTable )
{
ClusterCompositionResponse response;
try ( Connection connection = connections.acquire( routerAddress ) )
{
response = provider.getClusterComposition( connection );
}
catch ( SecurityException e )
{
// auth error happened, terminate the discovery procedure immediately
throw e;
}
catch ( Throwable t )
{
// connection turned out to be broken
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), t );
routingTable.removeRouter( routerAddress );
return null;
}

ClusterComposition cluster = response.clusterComposition();
logger.info( "Got cluster composition %s", cluster );
if ( cluster.hasWriters() )
{
return cluster;
}
return null;
}

private void sleep( long millis )
{
if ( millis > 0 )
{
try
{
clock.sleep( millis );
}
catch ( InterruptedException e )
{
// restore the interrupted status
Thread.currentThread().interrupt();
throw new ServiceUnavailableException( "Thread was interrupted while performing discovery", e );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,30 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
}
}

@Test
public void shouldUseInitialRouterForRediscoveryWhenAllOtherRoutersAreDead() throws Exception
{
// initial router does not have itself in the returned set of routers
StubServer router = StubServer.start( "acquire_endpoints.script", 9010 );

try ( Driver driver = GraphDatabase.driver( "bolt+routing://127.0.0.1:9010", config ) )
{
try ( Session session = driver.session( AccessMode.READ ) )
{
// restart router on the same port with different script that contains itself as reader
assertEquals( 0, router.exitStatus() );
router = StubServer.start( "rediscover_using_initial_router.script", 9010 );

List<Record> records = session.run( "MATCH (n) RETURN n.name AS name" ).list();
assertEquals( 2, records.size() );
assertEquals( "Bob", records.get( 0 ).get( "name" ).asString() );
assertEquals( "Alice", records.get( 1 ).get( "name" ).asString() );
}
}

assertEquals( 0, router.exitStatus() );
}

private static Driver newDriverWithSleeplessClock( String uriString )
{
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
private Driver driverWithPool( ConnectionPool pool )
{
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
ConnectionProvider connectionProvider = new LoadBalancer( settings, pool, clock, logging, SEED );
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
Config config = Config.build().withLogging( logging ).toConfig();
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
return new InternalDriver( insecure(), sessionFactory, logging );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );

// when
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );
LoadBalancer balancer = new LoadBalancer( conns, routingTable, rediscovery, DEV_NULL_LOGGER );

// then
assertNotNull( balancer );
InOrder inOrder = inOrder( rediscovery, routingTable, conns );
inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable );
inOrder.verify( rediscovery ).lookupClusterComposition( conns, routingTable );
inOrder.verify( routingTable ).update( any( ClusterComposition.class ) );
inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) );
}
Expand All @@ -88,7 +88,7 @@ public void shouldEnsureRoutingOnInitialization() throws Exception
{
// given & when
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
LoadBalancer balancer = new LoadBalancer( mock( ConnectionPool.class ), mock( RoutingTable.class ),
mock( Rediscovery.class ), DEV_NULL_LOGGER )
{
@Override
Expand Down Expand Up @@ -143,7 +143,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
RoutingTable routingTable = mock( RoutingTable.class );
ConnectionPool connectionPool = mock( ConnectionPool.class );
Rediscovery rediscovery = mock( Rediscovery.class );
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
BoltServerAddress address = new BoltServerAddress( "host", 42 );

PooledConnection connection = newConnectionWithFailingSync( address );
Expand Down Expand Up @@ -174,7 +174,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address );
when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync );
Rediscovery rediscovery = mock( Rediscovery.class );
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );

Session session = newSession( loadBalancer );
// begin transaction to make session obtain a connection
Expand Down Expand Up @@ -205,7 +205,7 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne
when( routingTable.readers() ).thenReturn( readerAddrs );
when( routingTable.writers() ).thenReturn( writerAddrs );

return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
return new LoadBalancer( connPool, routingTable, mock( Rediscovery.class ), DEV_NULL_LOGGER );
}

private static Session newSession( LoadBalancer loadBalancer )
Expand Down
Loading