Skip to content

Improve handling of RESET and ACK_FAILURE errors #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ public void unMuteAckFailure()
ackFailureMuted = false;
}

/**
* Check if ACK_FAILURE is muted.
* <p>
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
*
* @return {@code true} if ACK_FAILURE has been muted via {@link #muteAckFailure()}, {@code false} otherwise.
*/
public boolean isAckFailureMuted()
{
return ackFailureMuted;
}

private void ackFailureIfNeeded()
{
if ( !ackFailureMuted )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

public class AckFailureResponseHandler implements ResponseHandler
{
Expand All @@ -42,10 +43,21 @@ public void onSuccess( Map<String,Value> metadata )
@Override
public void onFailure( Throwable error )
{
if ( messageDispatcher.isAckFailureMuted() )
{
// RESET cancelled this ACK_FAILURE and made the database send an IGNORED message
// this is not a protocol violation and database has all the connection stated cleared now
messageDispatcher.clearCurrentError();
}
else
{
throw new ClientException( "Unable to acknowledge the previous error. Connection will be closed", error );
}
}

@Override
public void onRecord( Value[] fields )
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,18 @@ public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool,
}

@Override
protected void resetCompleted( CompletableFuture<Void> completionFuture )
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
{
setLastUsedTimestamp( channel, clock.millis() );
if ( success )
{
// update the last-used timestamp before returning the channel back to the pool
setLastUsedTimestamp( channel, clock.millis() );
}
else
{
// close the channel before returning it back to the pool if RESET failed
channel.close();
}

Future<Void> released = pool.release( channel );
released.addListener( ignore -> completionFuture.complete( null ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, Complet
@Override
public final void onSuccess( Map<String,Value> metadata )
{
resetCompleted();
resetCompleted( true );
}

@Override
public final void onFailure( Throwable error )
{
resetCompleted();
resetCompleted( false );
}

@Override
Expand All @@ -54,13 +54,13 @@ public final void onRecord( Value[] fields )
throw new UnsupportedOperationException();
}

private void resetCompleted()
private void resetCompleted( boolean success )
{
messageDispatcher.unMuteAckFailure();
resetCompleted( completionFuture );
resetCompleted( completionFuture, success );
}

protected void resetCompleted( CompletableFuture<Void> completionFuture )
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
{
completionFuture.complete( null );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand Down Expand Up @@ -428,6 +430,19 @@ public void shouldNotSupportAckFailureMessage()
}
}

@Test
public void shouldMuteAndUnMuteAckFailure()
{
InboundMessageDispatcher dispatcher = newDispatcher();
assertFalse( dispatcher.isAckFailureMuted() );

dispatcher.muteAckFailure();
assertTrue( dispatcher.isAckFailureMuted() );

dispatcher.unMuteAckFailure();
assertFalse( dispatcher.isAckFailureMuted() );
}

private static void verifyFailure( ResponseHandler handler )
{
ArgumentCaptor<Neo4jException> captor = ArgumentCaptor.forClass( Neo4jException.class );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.handlers;

import org.junit.Test;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AckFailureResponseHandlerTest
{
private final InboundMessageDispatcher dispatcher = mock( InboundMessageDispatcher.class );
private final AckFailureResponseHandler handler = new AckFailureResponseHandler( dispatcher );

@Test
public void shouldClearCurrentErrorOnSuccess()
{
verify( dispatcher, never() ).clearCurrentError();
handler.onSuccess( emptyMap() );
verify( dispatcher ).clearCurrentError();
}

@Test
public void shouldThrowOnFailure()
{
RuntimeException error = new RuntimeException( "Unable to process ACK_FAILURE" );

try
{
handler.onFailure( error );
fail( "Exception expected" );
}
catch ( ClientException e )
{
assertSame( error, e.getCause() );
}
}

@Test
public void shouldClearCurrentErrorWhenAckFailureMutedAndFailureReceived()
{
RuntimeException error = new RuntimeException( "Some error" );
when( dispatcher.isAckFailureMuted() ).thenReturn( true );

handler.onFailure( error );

verify( dispatcher ).clearCurrentError();
}

@Test
public void shouldThrowOnRecord()
{
try
{
handler.onRecord( new Value[0] );
fail( "Exception expected" );
}
catch ( UnsupportedOperationException ignore )
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void shouldReleaseChannelOnSuccess()
}

@Test
public void shouldReleaseChannelOnFailure()
public void shouldCloseAndReleaseChannelOnFailure()
{
ChannelPool pool = newChannelPoolMock();
FakeClock clock = new FakeClock();
Expand All @@ -81,7 +81,7 @@ public void shouldReleaseChannelOnFailure()

handler.onFailure( new RuntimeException() );

verifyLastUsedTimestamp( 100 );
assertTrue( channel.closeFuture().isDone() );
verify( pool ).release( eq( channel ) );
assertTrue( releaseFuture.isDone() );
assertFalse( releaseFuture.isCompletedExceptionally() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.neo4j.driver.v1.integration;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;

import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -41,8 +43,10 @@
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Logging;
Expand All @@ -53,18 +57,22 @@
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.StubServer;
import org.neo4j.driver.v1.util.TestNeo4j;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.v1.Config.defaultConfig;
import static org.neo4j.driver.v1.Values.parameters;
Expand Down Expand Up @@ -280,6 +288,36 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC
verify( connection2 ).release();
}

@Test
public void shouldCloseChannelWhenResetFails() throws Exception
{
StubServer server = StubServer.start( "reset_error.script", 9001 );
try
{
URI uri = URI.create( "bolt://localhost:9001" );
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withoutEncryption().toConfig();
ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );

try ( Driver driver = driverFactory.newInstance( uri, AuthTokens.none(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) )
{
try ( Session session = driver.session() )
{
assertEquals( 42, session.run( "RETURN 42 AS answer" ).single().get( 0 ).asInt() );
}

List<Channel> channels = driverFactory.pollChannels();
// there should be a single channel
assertEquals( 1, channels.size() );
// and it should be closed because it failed to RESET
assertNull( channels.get( 0 ).closeFuture().get( 30, SECONDS ) );
}
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

private StatementResult createNodesInNewSession( int nodesToCreate )
{
return createNodes( nodesToCreate, driver.session() );
Expand Down
11 changes: 11 additions & 0 deletions driver/src/test/resources/reset_error.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
!: AUTO INIT

C: RESET
S: SUCCESS {}
C: RUN "RETURN 42 AS answer" {}
PULL_ALL
S: SUCCESS {"fields": ["answer"]}
RECORD [42]
SUCCESS {}
C: RESET
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to reset"}