Skip to content

Improve Session#reset() & cleanup of Connection#release() #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
@Override
public CompletionStage<Void> verifyConnectivity()
{
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
return acquireConnection( READ ).thenCompose( Connection::release );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
if ( beginError != null )
{
// release connection if begin failed, transaction can't be started
connection.releaseNow();
connection.release();
throw new CompletionException( Futures.completionErrorCause( beginError ) );
}
return tx;
Expand Down Expand Up @@ -397,7 +397,7 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
return ( ignore, error ) ->
{
state = newState;
connection.releaseInBackground();
connection.release(); // release in background
session.setBookmark( bookmark );
};
}
Expand Down
21 changes: 10 additions & 11 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,13 @@ public void reset()

private CompletionStage<Void> resetAsync()
{
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
.thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} );
return existingTransactionOrNull().thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} ).thenCompose( ignore -> releaseConnection() );
}

@Override
Expand Down Expand Up @@ -496,7 +495,7 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )

private CompletionStage<Void> releaseResources()
{
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
return rollbackTransaction().thenCompose( ignore -> releaseConnection() );
}

private CompletionStage<Void> rollbackTransaction()
Expand All @@ -516,13 +515,13 @@ private CompletionStage<Void> rollbackTransaction()
} );
}

private CompletionStage<Void> releaseConnectionNow()
private CompletionStage<Void> releaseConnection()
{
return existingConnectionOrNull().thenCompose( connection ->
{
if ( connection != null )
{
return connection.releaseNow();
return connection.release();
}
return completedFuture( null );
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
import org.neo4j.driver.v1.Value;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;

// todo: keep state flags to prohibit interaction with released connections
public class NettyConnection implements Connection
{
private final Channel channel;
private final InboundMessageDispatcher messageDispatcher;
private final BoltServerAddress serverAddress;
private final ServerVersion serverVersion;
private final ChannelPool channelPool;
private final Clock clock;

Expand All @@ -56,7 +56,9 @@ public class NettyConnection implements Connection
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
{
this.channel = channel;
this.messageDispatcher = messageDispatcher( channel );
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
this.serverAddress = ChannelAttributes.serverAddress( channel );
this.serverVersion = ChannelAttributes.serverVersion( channel );
this.channelPool = channelPool;
this.clock = clock;
}
Expand All @@ -70,6 +72,7 @@ public boolean isOpen()
@Override
public void enableAutoRead()
{
assertOpen();
if ( autoReadEnabled.compareAndSet( false, true ) )
{
setAutoRead( true );
Expand All @@ -79,6 +82,7 @@ public void enableAutoRead()
@Override
public void disableAutoRead()
{
assertOpen();
if ( autoReadEnabled.compareAndSet( true, false ) )
{
setAutoRead( false );
Expand All @@ -89,27 +93,20 @@ public void disableAutoRead()
public void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
ResponseHandler pullAllHandler )
{
assertOpen();
run( statement, parameters, runHandler, pullAllHandler, false );
}

@Override
public void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
ResponseHandler pullAllHandler )
{
assertOpen();
run( statement, parameters, runHandler, pullAllHandler, true );
}

@Override
public void releaseInBackground()
{
if ( open.compareAndSet( true, false ) )
{
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
}
}

@Override
public CompletionStage<Void> releaseNow()
public CompletionStage<Void> release()
{
if ( open.compareAndSet( true, false ) )
{
Expand All @@ -126,13 +123,13 @@ public CompletionStage<Void> releaseNow()
@Override
public BoltServerAddress serverAddress()
{
return ChannelAttributes.serverAddress( channel );
return serverAddress;
}

@Override
public ServerVersion serverVersion()
{
return ChannelAttributes.serverVersion( channel );
return serverVersion;
}

private void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
Expand Down Expand Up @@ -185,4 +182,12 @@ private void setAutoRead( boolean value )
{
channel.config().setAutoRead( value );
}

private void assertOpen()
{
if ( !open.get() )
{
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,16 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
newRoutingResponseHandler( pullAllHandler ) );
}

@Override
public void releaseInBackground()
{
delegate.releaseInBackground();
}

@Override
public boolean isOpen()
{
return delegate.isOpen();
}

@Override
public CompletionStage<Void> releaseNow()
public CompletionStage<Void> release()
{
return delegate.releaseNow();
return delegate.release();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ public class ResetResponseHandler implements ResponseHandler
private final Clock clock;
private final Promise<Void> releasePromise;

public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
Clock clock )
{
this( channel, pool, messageDispatcher, clock, null );
}

public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
Clock clock, Promise<Void> releasePromise )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ public SessionPullAllResponseHandler( Statement statement, RunResponseHandler ru
@Override
protected void afterSuccess()
{
connection.releaseInBackground();
releaseConnection();
}

@Override
protected void afterFailure( Throwable error )
{
connection.releaseInBackground();
releaseConnection();
}

private void releaseConnection()
{
connection.release(); // release in background
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ void run( String statement, Map<String,Value> parameters, ResponseHandler runHan
void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
ResponseHandler pullAllHandler );

void releaseInBackground();

CompletionStage<Void> releaseNow();
CompletionStage<Void> release();

BoltServerAddress serverAddress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void shouldRollbackOnImplicitFailure()
InOrder order = inOrder( connection );
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() );
order.verify( connection ).releaseInBackground();
order.verify( connection ).release();
}

@Test
Expand All @@ -77,7 +77,7 @@ public void shouldRollbackOnExplicitFailure()
InOrder order = inOrder( connection );
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() );
order.verify( connection ).releaseInBackground();
order.verify( connection ).release();
}

@Test
Expand All @@ -95,7 +95,7 @@ public void shouldCommitOnSuccess()
InOrder order = inOrder( connection );
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
order.verify( connection ).runAndFlush( eq( "COMMIT" ), any(), any(), any() );
order.verify( connection ).releaseInBackground();
order.verify( connection ).release();
}

@Test
Expand Down Expand Up @@ -243,7 +243,7 @@ public void shouldReleaseConnectionWhenBeginFails()
assertEquals( error, e );
}

verify( connection ).releaseNow();
verify( connection ).release();
}

@Test
Expand All @@ -253,7 +253,7 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds()
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );

verify( connection, never() ).releaseNow();
verify( connection, never() ).release();
}

private static ExplicitTransaction beginTx( Connection connection )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class NetworkSessionTest
public void setUp()
{
connection = connectionMock();
when( connection.releaseNow() ).thenReturn( completedFuture( null ) );
when( connection.release() ).thenReturn( completedFuture( null ) );
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
connectionProvider = mock( ConnectionProvider.class );
when( connectionProvider.acquireConnection( any( AccessMode.class ) ) )
Expand Down Expand Up @@ -214,7 +215,7 @@ public void releasesOpenConnectionUsedForRunWhenSessionIsClosed()

InOrder inOrder = inOrder( connection );
inOrder.verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() );
inOrder.verify( connection ).releaseNow();
inOrder.verify( connection, atLeastOnce() ).release();
}

@SuppressWarnings( "deprecation" )
Expand Down Expand Up @@ -274,7 +275,7 @@ public void releasesConnectionWhenTxIsClosed()
verify( connection ).runAndFlush( eq( query ), any(), any(), any() );

tx.close();
verify( connection ).releaseInBackground();
verify( connection ).release();
}

@Test
Expand Down Expand Up @@ -484,22 +485,18 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows()
}

@Test
@SuppressWarnings( "deprecation" )
public void connectionShouldBeReleasedAfterSessionReset()
{
NetworkSession session = newSession( connectionProvider, READ );
session.run( "RETURN 1" );

verify( connection, never() ).releaseInBackground();
verify( connection, never() ).releaseNow();
verify( connection, never() ).release();

session.reset();
verify( connection, never() ).releaseInBackground();
verify( connection ).releaseNow();
verify( connection ).release();
}

@Test
@SuppressWarnings( "deprecation" )
public void transactionShouldBeRolledBackAfterSessionReset()
{
NetworkSession session = newSession( connectionProvider, READ );
Expand Down Expand Up @@ -663,6 +660,25 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection()
verifyBeginTx( connection, times( 1 ) );
}

@Test
public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset()
{
NetworkSession session = newSession( connectionProvider, READ );
Transaction tx = session.beginTransaction();

assertTrue( tx.isOpen() );
when( connection.release() ).then( invocation ->
{
// verify that tx is not open when connection is released
assertFalse( tx.isOpen() );
return completedFuture( null );
} );

session.reset();

verify( connection ).release();
}

private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode )
{
NetworkSession session = newSession( connectionProvider, sessionMode );
Expand Down
Loading