Skip to content

Commit a86ea1d

Browse files
authored
Merge pull request #594 from zhenlineo/1.7-reserve-error
Suppress subsequent connection errors into existing error
2 parents 71ff948 + 08802b8 commit a86ea1d

File tree

12 files changed

+158
-29
lines changed

12 files changed

+158
-29
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
3232
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
3333
import org.neo4j.driver.internal.async.pool.PoolSettings;
34-
import org.neo4j.driver.internal.cluster.IdentityResolver;
3534
import org.neo4j.driver.internal.cluster.RoutingContext;
3635
import org.neo4j.driver.internal.cluster.RoutingSettings;
3736
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
@@ -66,6 +65,7 @@
6665
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
6766
import static org.neo4j.driver.internal.metrics.spi.Metrics.isMetricsEnabled;
6867
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
68+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
6969

7070
public class DriverFactory
7171
{
@@ -371,10 +371,7 @@ private static void closeConnectionPoolAndSuppressError( ConnectionPool connecti
371371
}
372372
catch ( Throwable closeError )
373373
{
374-
if ( mainError != closeError )
375-
{
376-
mainError.addSuppressed( closeError );
377-
}
374+
addSuppressed( mainError, closeError );
378375
}
379376
}
380377

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import static java.util.Collections.emptyMap;
4545
import static java.util.concurrent.CompletableFuture.completedFuture;
46+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
4647
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4748
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4849

@@ -403,7 +404,7 @@ private <T> void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, C
403404
{
404405
if ( rollbackError != null )
405406
{
406-
error.addSuppressed( rollbackError );
407+
addSuppressed( error, rollbackError );
407408
}
408409
resultFuture.completeExceptionally( error );
409410
} );

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
8888
else
8989
{
9090
failed = true;
91-
log.error( "Fatal error occurred in the pipeline", error );
91+
log.warn( "Fatal error occurred in the pipeline", error );
9292
fail( ctx, error );
9393
}
9494
}
9595

9696
private void fail( ChannelHandlerContext ctx, Throwable error )
9797
{
9898
Throwable cause = transformError( error );
99-
messageDispatcher.handleFatalError( cause );
99+
messageDispatcher.handleChannelError( cause );
100100
log.debug( "Closing channel because of a failure '%s'", error );
101101
ctx.close();
102102
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import static java.util.Objects.requireNonNull;
3939
import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET;
40+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
4041

4142
public class InboundMessageDispatcher implements ResponseMessageHandler
4243
{
@@ -141,9 +142,17 @@ public void handleIgnoredMessage()
141142
handler.onFailure( error );
142143
}
143144

144-
public void handleFatalError( Throwable error )
145+
public void handleChannelError( Throwable error )
145146
{
146-
currentError = error;
147+
if ( currentError != null )
148+
{
149+
// we already have an error, this new error probably is caused by the existing one, thus we chain the new error on this current error
150+
addSuppressed( currentError, error );
151+
}
152+
else
153+
{
154+
currentError = error;
155+
}
147156
fatalErrorOccurred = true;
148157

149158
while ( !handlers.isEmpty() )

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe
6262
@Override
6363
public void channelReleased( Channel channel )
6464
{
65-
log.debug( "Channel [%s] released back to the pool", channel.id() );
65+
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
6666
decrementInUse( channel );
6767
incrementIdle( channel );
6868
channel.closeFuture().addListener( closeListener );

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ private static String extractClassification( String code )
127127
return parts[1];
128128
}
129129

130+
public static void addSuppressed( Throwable mainError, Throwable error )
131+
{
132+
if ( mainError != error )
133+
{
134+
mainError.addSuppressed( error );
135+
}
136+
}
137+
130138
/**
131139
* Exception which is merely a holder of an async stacktrace, which is not the primary stacktrace users are interested in.
132140
* Used for blocking API calls that block on async API calls.

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
3030

3131
import static java.util.concurrent.CompletableFuture.completedFuture;
32+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
3233

3334
public final class Futures
3435
{
@@ -184,10 +185,7 @@ public static CompletionException combineErrors( Throwable error1, Throwable err
184185
{
185186
Throwable cause1 = completionExceptionCause( error1 );
186187
Throwable cause2 = completionExceptionCause( error2 );
187-
if ( cause1 != cause2 )
188-
{
189-
cause1.addSuppressed( cause2 );
190-
}
188+
addSuppressed( cause1, cause2 );
191189
return asCompletionException( cause1 );
192190
}
193191
else if ( error1 != null )

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,7 @@ void shouldSendReadAccessModeInStatementMetadata() throws Exception
149149
{
150150
StubServer server = StubServer.start( "hello_run_exit_read.script", 9001 );
151151

152-
Config config = Config.builder()
153-
.withoutEncryption()
154-
.build();
155-
156-
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
152+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
157153
Session session = driver.session( AccessMode.READ ) )
158154
{
159155
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
@@ -170,11 +166,7 @@ void shouldNotSendWriteAccessModeInStatementMetadata() throws Exception
170166
{
171167
StubServer server = StubServer.start( "hello_run_exit.script", 9001 );
172168

173-
Config config = Config.builder()
174-
.withoutEncryption()
175-
.build();
176-
177-
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
169+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
178170
Session session = driver.session( AccessMode.WRITE ) )
179171
{
180172
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
@@ -240,6 +232,50 @@ void shouldThrowRollbackErrorWhenTransactionClosed() throws Exception
240232
testTxCloseErrorPropagation( "rollback_error.script", false, "Unable to rollback" );
241233
}
242234

235+
@Test
236+
void shouldThrowCorrectErrorOnRunFailure() throws Throwable
237+
{
238+
StubServer server = StubServer.start( "database_shutdown.script", 9001 );
239+
240+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
241+
Session session = driver.session( "neo4j:bookmark:v1:tx0" );
242+
// has to enforce to flush BEGIN to have tx started.
243+
Transaction transaction = session.beginTransaction() )
244+
{
245+
TransientException error = assertThrows( TransientException.class, () -> {
246+
StatementResult result = transaction.run( "RETURN 1" );
247+
result.consume();
248+
} );
249+
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
250+
}
251+
finally
252+
{
253+
assertEquals( 0, server.exitStatus() );
254+
}
255+
}
256+
257+
@Test
258+
void shouldThrowCorrectErrorOnCommitFailure() throws Throwable
259+
{
260+
StubServer server = StubServer.start( "database_shutdown_at_commit.script", 9001 );
261+
262+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
263+
Session session = driver.session() )
264+
{
265+
Transaction transaction = session.beginTransaction();
266+
StatementResult result = transaction.run( "CREATE (n {name:'Bob'})" );
267+
result.consume();
268+
transaction.success();
269+
270+
TransientException error = assertThrows( TransientException.class, transaction::close );
271+
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
272+
}
273+
finally
274+
{
275+
assertEquals( 0, server.exitStatus() );
276+
}
277+
}
278+
243279
private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit,
244280
String expectedErrorMessage ) throws Exception
245281
{

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.neo4j.driver.v1.TransactionWork;
4949
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5050
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
51+
import org.neo4j.driver.v1.exceptions.TransientException;
5152
import org.neo4j.driver.v1.net.ServerAddress;
5253
import org.neo4j.driver.v1.net.ServerAddressResolver;
5354
import org.neo4j.driver.v1.util.StubServer;
@@ -764,6 +765,41 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc
764765
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
765766
}
766767

768+
@Test
769+
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemovedV3() throws Exception
770+
{
771+
// This test simulates a router in a cluster when a leader is removed.
772+
// The router first returns a RT with a writer inside.
773+
// However this writer is killed while the driver is running a tx with it.
774+
// Then at the second time the router returns the same RT with the killed writer inside.
775+
// At the third round, the router removes the the writer server from RT reply.
776+
// Finally, the router returns a RT with a reachable writer.
777+
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
778+
StubServer brokenWriter = StubServer.start( "database_shutdown_at_commit.script", 9004 );
779+
StubServer writer = StubServer.start( "write_server.script", 9008 );
780+
781+
Logger logger = mock( Logger.class );
782+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
783+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
784+
Session session = driver.session() )
785+
{
786+
AtomicInteger invocations = new AtomicInteger();
787+
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );
788+
789+
assertEquals( 0, records.size() );
790+
assertEquals( 2, invocations.get() );
791+
}
792+
finally
793+
{
794+
assertEquals( 0, router.exitStatus() );
795+
assertEquals( 0, brokenWriter.exitStatus() );
796+
assertEquals( 0, writer.exitStatus() );
797+
}
798+
verify( logger, times( 1 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( TransientException.class ) );
799+
verify( logger, times( 2 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
800+
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
801+
}
802+
767803
@Test
768804
void shouldRetryReadTransactionUntilFailure() throws Exception
769805
{

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.junit.jupiter.api.Assertions.assertThrows;
4242
import static org.mockito.ArgumentMatchers.any;
4343
import static org.mockito.ArgumentMatchers.anyBoolean;
44+
import static org.mockito.ArgumentMatchers.argThat;
4445
import static org.mockito.ArgumentMatchers.eq;
4546
import static org.mockito.Mockito.inOrder;
4647
import static org.mockito.Mockito.mock;
@@ -157,7 +158,7 @@ void shouldPeekHandlerOnRecord()
157158
}
158159

159160
@Test
160-
void shouldFailAllHandlersOnFatalError()
161+
void shouldFailAllHandlersOnChannelError()
161162
{
162163
InboundMessageDispatcher dispatcher = newDispatcher();
163164

@@ -170,7 +171,7 @@ void shouldFailAllHandlersOnFatalError()
170171
dispatcher.enqueue( handler3 );
171172

172173
RuntimeException fatalError = new RuntimeException( "Fatal!" );
173-
dispatcher.handleFatalError( fatalError );
174+
dispatcher.handleChannelError( fatalError );
174175

175176
InOrder inOrder = inOrder( handler1, handler2, handler3 );
176177
inOrder.verify( handler1 ).onFailure( fatalError );
@@ -179,19 +180,36 @@ void shouldFailAllHandlersOnFatalError()
179180
}
180181

181182
@Test
182-
void shouldFailNewHandlerAfterFatalError()
183+
void shouldFailNewHandlerAfterChannelError()
183184
{
184185
InboundMessageDispatcher dispatcher = newDispatcher();
185186

186187
RuntimeException fatalError = new RuntimeException( "Fatal!" );
187-
dispatcher.handleFatalError( fatalError );
188+
dispatcher.handleChannelError( fatalError );
188189

189190
ResponseHandler handler = mock( ResponseHandler.class );
190191
dispatcher.enqueue( handler );
191192

192193
verify( handler ).onFailure( fatalError );
193194
}
194195

196+
@Test
197+
void shouldAttachChannelErrorOnExistingError()
198+
{
199+
InboundMessageDispatcher dispatcher = newDispatcher();
200+
201+
ResponseHandler handler = mock( ResponseHandler.class );
202+
dispatcher.enqueue( handler );
203+
204+
dispatcher.handleFailureMessage( "Neo.ClientError", "First error!" );
205+
RuntimeException fatalError = new RuntimeException( "Second Error!" );
206+
dispatcher.handleChannelError( fatalError );
207+
208+
verify( handler ).onFailure( argThat(
209+
error -> error instanceof ClientException && error.getMessage().equals( "First error!" ) &&
210+
error.getSuppressed().length == 1 && error.getSuppressed()[0].getMessage().equals( "Second Error!" ) ) );
211+
}
212+
195213
@Test
196214
void shouldDequeHandlerOnIgnored()
197215
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: BOLT 3
2+
3+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
4+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
5+
C: RESET
6+
S: SUCCESS {}
7+
C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx0"]}
8+
S: SUCCESS {}
9+
C: RUN "RETURN 1" {} {}
10+
PULL_ALL
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
12+
S: <EXIT>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
!: BOLT 3
2+
!: AUTO RESET
3+
4+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6+
C: BEGIN {}
7+
RUN "CREATE (n {name:'Bob'})" {} {}
8+
PULL_ALL
9+
S: SUCCESS {}
10+
SUCCESS {}
11+
SUCCESS {}
12+
C: COMMIT
13+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
14+
S: <EXIT>

0 commit comments

Comments
 (0)