Skip to content

Suppress pool was closed errors #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;

public class ConnectionPoolImpl implements ConnectionPool
{
Expand All @@ -57,12 +60,13 @@ public class ConnectionPoolImpl implements ConnectionPool
private final Logger log;
private final MetricsListener metricsListener;

private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();

public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
{
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock );
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging,
clock );
}

ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
Expand All @@ -84,7 +88,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
log.trace( "Acquiring a connection from pool towards %s", address );

assertNotClosed();
ChannelPool pool = getOrCreatePool( address );
ExtendedChannelPool pool = getOrCreatePool( address );

ListenerEvent acquireEvent = metricsListener.createListenerEvent();
metricsListener.beforeAcquiringOrCreating( address, acquireEvent );
Expand All @@ -94,7 +98,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
{
try
{
processAcquisitionError( address, error );
processAcquisitionError( pool, address, error );
assertNotClosed( address, channel, pool );
Connection connection = new DirectConnection( channel, pool, clock, metricsListener );

Expand Down Expand Up @@ -153,7 +157,7 @@ public CompletionStage<Void> close()
try
{
nettyChannelTracker.prepareToCloseChannels();
for ( Map.Entry<BoltServerAddress,ChannelPool> entry : pools.entrySet() )
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
{
BoltServerAddress address = entry.getKey();
ChannelPool pool = entry.getValue();
Expand All @@ -178,9 +182,9 @@ public boolean isOpen( BoltServerAddress address )
return pools.containsKey( address );
}

private ChannelPool getOrCreatePool( BoltServerAddress address )
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
ChannelPool pool = pools.get( address );
ExtendedChannelPool pool = pools.get( address );
if ( pool != null )
{
return pool;
Expand All @@ -201,7 +205,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
return pool;
}

ChannelPool newPool( BoltServerAddress address )
ExtendedChannelPool newPool( BoltServerAddress address )
{
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker,
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
Expand All @@ -212,7 +216,7 @@ private EventLoopGroup eventLoopGroup()
return bootstrap.config().group();
}

private void processAcquisitionError( BoltServerAddress serverAddress, Throwable error )
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
{
Throwable cause = Futures.completionExceptionCause( error );
if ( cause != null )
Expand All @@ -226,6 +230,13 @@ private void processAcquisitionError( BoltServerAddress serverAddress, Throwable
"Unable to acquire connection from the pool within configured maximum time of " +
settings.connectionAcquisitionTimeout() + "ms" );
}
else if ( pool.isClosed() )
{
// There is a race condition where a thread tries to acquire a connection while the pool is closed by another concurrent thread.
// Treat as failed to obtain connection for a direct driver. For a routing driver, this error should be retried.
throw new ServiceUnavailableException( format( "Connection pool for server %s is closed while acquiring a connection.", serverAddress ),
cause );
}
else
{
// some unknown error happened during connection acquisition, propagate it
Expand Down Expand Up @@ -258,4 +269,10 @@ public String toString()
{
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
}

// for testing only
ExtendedChannelPool getPool( BoltServerAddress address )
{
return pools.get( address );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async.pool;

import io.netty.channel.pool.ChannelPool;

public interface ExtendedChannelPool extends ChannelPool
{
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.FixedChannelPool;

import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.metrics.ListenerEvent;

import static java.util.Objects.requireNonNull;

public class NettyChannelPool extends FixedChannelPool
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
{
/**
* Unlimited amount of parties are allowed to request channels from the pool.
Expand All @@ -44,6 +46,7 @@ public class NettyChannelPool extends FixedChannelPool
private final BoltServerAddress address;
private final ChannelConnector connector;
private final NettyChannelTracker handler;
private final AtomicBoolean closed = new AtomicBoolean( false );

public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
Expand Down Expand Up @@ -76,4 +79,18 @@ protected ChannelFuture connectChannel( Bootstrap bootstrap )
} );
return channelFuture;
}

@Override
public void close()
{
if ( closed.compareAndSet( false, true ) )
{
super.close();
}
}

public boolean isClosed()
{
return closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.util.concurrent.EventExecutorGroup;

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -38,12 +37,10 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.exceptions.SecurityException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.net.ServerAddress;
import org.neo4j.driver.v1.net.ServerAddressResolver;

import static java.lang.String.format;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
Expand Down Expand Up @@ -265,7 +262,7 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
else
{
// connection turned out to be broken
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
logger.info( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
routingTable.forget( routerAddress );
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private synchronized void freshClusterCompositionFetched( ClusterComposition com

private synchronized void clusterCompositionLookupFailed( Throwable error )
{
log.error( "Failed to update routing table. Current routing table: " + routingTable, error );
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
refreshRoutingTableFuture = null;
routingTableFuture.completeExceptionally( error );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,11 @@
package org.neo4j.driver.internal.async.pool;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.async.BootstrapFactory;
Expand All @@ -43,32 +36,21 @@
import org.neo4j.driver.v1.util.DatabaseExtension;
import org.neo4j.driver.v1.util.ParallelizableIT;

import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.v1.util.TestUtil.await;

@ParallelizableIT
class ConnectionPoolImplIT
{
private static final BoltServerAddress ADDRESS_1 = new BoltServerAddress( "server:1" );
private static final BoltServerAddress ADDRESS_2 = new BoltServerAddress( "server:2" );
private static final BoltServerAddress ADDRESS_3 = new BoltServerAddress( "server:3" );

@RegisterExtension
static final DatabaseExtension neo4j = new DatabaseExtension();

Expand Down Expand Up @@ -132,71 +114,16 @@ void shouldNotCloseWhenClosed()
}

@Test
void shouldDoNothingWhenRetainOnEmptyPool()
{
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );

pool.retainAll( singleton( LOCAL_DEFAULT ) );

verifyZeroInteractions( nettyChannelTracker );
}

@Test
void shouldRetainSpecifiedAddresses()
{
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );

pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
pool.acquire( ADDRESS_3 );

pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) );
for ( ChannelPool channelPool : pool.channelPoolsByAddress.values() )
{
verify( channelPool, never() ).close();
}
}

@Test
void shouldClosePoolsWhenRetaining()
{
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );

pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
pool.acquire( ADDRESS_3 );

when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 2 );
when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 0 );
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );

pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
verify( pool.getPool( ADDRESS_2 ) ).close();
verify( pool.getPool( ADDRESS_3 ), never() ).close();
}

@Test
void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
void shouldFailToAcquireConnectionWhenPoolIsClosed()
{
NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );

pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
pool.acquire( ADDRESS_3 );

when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 1 );
when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 42 );
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );

pool.retainAll( singleton( ADDRESS_2 ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
verify( pool.getPool( ADDRESS_2 ), never() ).close();
verify( pool.getPool( ADDRESS_3 ) ).close();
await( pool.acquire( neo4j.address() ) );
ExtendedChannelPool channelPool = this.pool.getPool( neo4j.address() );
channelPool.close();
ServiceUnavailableException error =
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );
assertThat( error.getCause(), instanceOf( IllegalStateException.class ) );
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPooled was closed" ) );
}

private ConnectionPoolImpl newPool() throws Exception
Expand All @@ -209,37 +136,8 @@ private ConnectionPoolImpl newPool() throws Exception
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
}

private static PoolSettings newSettings()
{
return new PoolSettings( 10, 5000, -1, -1 );
}

private static class TestConnectionPool extends ConnectionPoolImpl
{
final Map<BoltServerAddress,ChannelPool> channelPoolsByAddress = new HashMap<>();

TestConnectionPool( NettyChannelTracker nettyChannelTracker )
{
super( mock( ChannelConnector.class ), mock( Bootstrap.class ), nettyChannelTracker, newSettings(),
DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock() );
}

ChannelPool getPool( BoltServerAddress address )
{
ChannelPool pool = channelPoolsByAddress.get( address );
assertNotNull( pool );
return pool;
}

@Override
ChannelPool newPool( BoltServerAddress address )
{
ChannelPool channelPool = mock( ChannelPool.class );
Channel channel = mock( Channel.class );
doReturn( ImmediateEventExecutor.INSTANCE.newSucceededFuture( channel ) ).when( channelPool ).acquire();
channelPoolsByAddress.put( address, channelPool );
return channelPool;
}
}
}
Loading