Skip to content

Commit a410b78

Browse files
authored
Merge pull request #681 from zhenlineo/4.0-fix-rx-stress-it
Ensure error is propagated from rx queries correctly.
2 parents 38c3738 + e054ec5 commit a410b78

10 files changed

+40
-32
lines changed

driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.lang.management.OperatingSystemMXBean;
3131
import java.lang.reflect.Method;
3232
import java.net.URI;
33-
import java.time.Duration;
3433
import java.util.ArrayList;
3534
import java.util.Arrays;
3635
import java.util.HashMap;
@@ -99,7 +98,6 @@ abstract class AbstractStressTestBase<C extends AbstractContext>
9998

10099
private static final int BIG_DATA_TEST_NODE_COUNT = Integer.getInteger( "bigDataTestNodeCount", 30_000 );
101100
private static final int BIG_DATA_TEST_BATCH_SIZE = Integer.getInteger( "bigDataTestBatchSize", 10_000 );
102-
private static final Duration DEFAULT_BLOCKING_TIME_OUT = Duration.ofMinutes( 10 );
103101

104102
private LoggerNameTrackingLogging logging;
105103
private ExecutorService executor;
@@ -638,7 +636,7 @@ private Bookmark createNodesRx( int batchCount, int batchSize, InternalDriver dr
638636

639637
Flux.concat( Flux.range( 0, batchCount ).map( batchIndex ->
640638
session.writeTransaction( tx -> createNodesInTxRx( tx, batchIndex, batchSize ) )
641-
) ).blockLast( DEFAULT_BLOCKING_TIME_OUT ); // throw any error if happened
639+
) ).blockLast(); // throw any error if happened
642640

643641
long end = System.nanoTime();
644642
System.out.println( "Node creation with reactive API took: " + NANOSECONDS.toMillis( end - start ) + "ms" );
@@ -673,7 +671,7 @@ private void readNodesRx( InternalDriver driver, Bookmark bookmark, int expected
673671
verifyNodeProperties( node );
674672
} ).then() );
675673

676-
Flux.from( readQuery ).blockLast( DEFAULT_BLOCKING_TIME_OUT );
674+
Flux.from( readQuery ).blockLast();
677675

678676
assertEquals( expectedNodeCount, nodesSeen.get() );
679677

driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQuery.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public CompletionStage<Void> execute( C context )
5555
}
5656
else
5757
{
58+
context.setBookmark( session.lastBookmark() );
5859
assertEquals( 1, summary.counters().nodesCreated() );
5960
context.nodeCreated();
6061
}

driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryInTx.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ public CompletionStage<Void> execute( C context )
4545

4646
CompletionStage<ResultSummary> txCommitted = session.beginTransactionAsync().thenCompose( tx ->
4747
tx.runAsync( "CREATE ()" ).thenCompose( cursor ->
48-
cursor.consumeAsync().thenCompose( summary ->
49-
tx.commitAsync().thenApply( ignore -> summary ) ) ) );
48+
cursor.consumeAsync().thenCompose( summary -> tx.commitAsync().thenApply( ignore -> {
49+
context.setBookmark( session.lastBookmark() );
50+
return summary;
51+
} ) ) ) );
5052

5153
return txCommitted.handle( ( summary, error ) ->
5254
{

driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQuery.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.neo4j.driver.AccessMode;
2222
import org.neo4j.driver.Driver;
2323
import org.neo4j.driver.Session;
24-
import org.neo4j.driver.Result;
24+
import org.neo4j.driver.summary.ResultSummary;
2525

2626
import static org.junit.jupiter.api.Assertions.assertEquals;
2727

@@ -38,12 +38,13 @@ public BlockingWriteQuery( AbstractStressTestBase<C> stressTest, Driver driver,
3838
@Override
3939
public void execute( C context )
4040
{
41-
Result result = null;
41+
ResultSummary summary = null;
4242
Throwable queryError = null;
4343

4444
try ( Session session = newSession( AccessMode.WRITE, context ) )
4545
{
46-
result = session.run( "CREATE ()" );
46+
summary = session.run( "CREATE ()" ).consume();
47+
context.setBookmark( session.lastBookmark() );
4748
}
4849
catch ( Throwable error )
4950
{
@@ -54,9 +55,9 @@ public void execute( C context )
5455
}
5556
}
5657

57-
if ( queryError == null && result != null )
58+
if ( queryError == null && summary != null )
5859
{
59-
assertEquals( 1, result.consume().counters().nodesCreated() );
60+
assertEquals( 1, summary.counters().nodesCreated() );
6061
context.nodeCreated();
6162
}
6263
}

driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import org.neo4j.driver.AccessMode;
2829
import org.neo4j.driver.Driver;
@@ -51,8 +52,7 @@ public CompletionStage<Void> execute( C context )
5152
RxSession::close )
5253
.subscribe( record -> {
5354
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
55-
}, error -> {
55+
}, error -> {
5656
Throwable cause = Futures.completionExceptionCause( error );
5757
assertThat( cause, is( arithmeticError() ) );
5858
queryFinished.complete( null );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public CompletionStage<Void> execute( C context )
5252
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null )
5353
.subscribe( record -> {
5454
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
55-
queryFinished.complete( null );
5655
}, error -> {
5756
Throwable cause = Futures.completionExceptionCause( error );
5857
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public CompletionStage<Void> execute( C context )
5151
RxSession::close )
5252
.subscribe( record -> {
5353
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
5554
}, error -> {
5655
Throwable cause = Futures.completionExceptionCause( error );
5756
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,27 @@ public CompletionStage<Void> execute( C context )
4747
{
4848
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
4949
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ),
50-
session -> session.run( "CREATE ()" ).consume(), RxSession::close )
50+
session -> Flux.from( session.run( "CREATE ()" ).consume() )
51+
.doOnComplete( () -> context.setBookmark( session.lastBookmark() ) ),
52+
RxSession::close )
5153
.subscribe( summary -> {
52-
queryFinished.complete( null );
5354
assertEquals( 1, summary.counters().nodesCreated() );
5455
context.nodeCreated();
55-
}, error -> {
5656
queryFinished.complete( null );
57-
handleError( Futures.completionExceptionCause( error ), context );
58-
} );
57+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
5958

6059
return queryFinished;
6160
}
6261

63-
private void handleError( Throwable error, C context )
62+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6463
{
6564
if ( !stressTest.handleWriteFailure( error, context ) )
6665
{
67-
throw new RuntimeException( error );
66+
queryFinished.completeExceptionally( error );
67+
}
68+
else
69+
{
70+
queryFinished.complete( null );
6871
}
6972
}
7073
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,26 @@ public CompletionStage<Void> execute( C context )
4949
Flux.usingWhen( session.beginTransaction(), tx -> tx.run( "CREATE ()" ).consume(),
5050
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null ).subscribe(
5151
summary -> {
52+
context.setBookmark( session.lastBookmark() );
5253
assertEquals( 1, summary.counters().nodesCreated() );
5354
context.nodeCreated();
5455
queryFinished.complete( null );
5556
}, error -> {
56-
handleError( Futures.completionExceptionCause( error ), context );
57-
queryFinished.complete( null );
57+
handleError( Futures.completionExceptionCause( error ), context, queryFinished );
5858
} );
5959

6060
return queryFinished;
6161
}
6262

63-
private void handleError( Throwable error, C context )
63+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6464
{
6565
if ( !stressTest.handleWriteFailure( error, context ) )
6666
{
67-
throw new RuntimeException( error );
67+
queryFinished.completeExceptionally( error );
68+
}
69+
else
70+
{
71+
queryFinished.complete( null );
6872
}
6973
}
7074
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,23 @@ public CompletionStage<Void> execute( C context )
5050
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ),
5151
session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), RxSession::close )
5252
.subscribe( summary -> {
53-
queryFinished.complete( null );
5453
assertEquals( 1, summary.counters().nodesCreated() );
5554
context.nodeCreated();
56-
}, error -> {
5755
queryFinished.complete( null );
58-
handleError( Futures.completionExceptionCause( error ), context );
59-
} );
56+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
6057

6158
return queryFinished;
6259
}
6360

64-
private void handleError( Throwable error, C context )
61+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6562
{
6663
if ( !stressTest.handleWriteFailure( error, context ) )
6764
{
68-
throw new RuntimeException( error );
65+
queryFinished.completeExceptionally( error );
66+
}
67+
else
68+
{
69+
queryFinished.complete( null );
6970
}
7071
}
7172
}

0 commit comments

Comments
 (0)