Skip to content

Commit 575bbae

Browse files
authored
Run response immediate processing update (#897)
* Make run methods wait for successful request acknowledgement This update makes `run` methods (session, tx) wait for successful request acknowledgement by the server. Specifically, the `Result` object will only be returned when there is a successful response from the server to the `RUN` message, meaning that the request was at least accepted by the server for processing. Otherwise, an appropriate error will be thrown. Additionally, this update makes the `Result.keys` method non-blocking and free from potential communication errors. * Update handling of failed queries * Move common handling to async cursor * Improve DisposableAsyncResultCursor tests * Removed duplicate test * Removed FINE logging
1 parent d68c972 commit 575bbae

File tree

55 files changed

+839
-958
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+839
-958
lines changed

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
public interface FailableCursor
2424
{
2525
/**
26-
* Discarding all unconsumed records and returning failure if there is any to run and/or pulls.
26+
* Discarding all unconsumed records and returning failure if there is any pull errors.
2727
*/
2828
CompletionStage<Throwable> discardAllFailureAsync();
2929

3030
/**
31-
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
31+
* Pulling all unconsumed records into memory and returning failure if there is any pull errors.
3232
*/
3333
CompletionStage<Throwable> pullAllFailureAsync();
3434
}

driver/src/main/java/org/neo4j/driver/internal/InternalResult.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public class InternalResult implements Result
3939
{
4040
private final Connection connection;
4141
private final ResultCursor cursor;
42-
private List<String> keys;
4342

4443
public InternalResult(Connection connection, ResultCursor cursor )
4544
{
@@ -50,12 +49,7 @@ public InternalResult(Connection connection, ResultCursor cursor )
5049
@Override
5150
public List<String> keys()
5251
{
53-
if ( keys == null )
54-
{
55-
blockingGet( cursor.peekAsync() );
56-
keys = cursor.keys();
57-
}
58-
return keys;
52+
return cursor.keys();
5953
}
6054

6155
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
import org.neo4j.driver.AccessMode;
2424
import org.neo4j.driver.Bookmark;
2525
import org.neo4j.driver.Query;
26-
import org.neo4j.driver.Session;
2726
import org.neo4j.driver.Result;
27+
import org.neo4j.driver.Session;
2828
import org.neo4j.driver.Transaction;
2929
import org.neo4j.driver.TransactionConfig;
3030
import org.neo4j.driver.TransactionWork;
3131
import org.neo4j.driver.async.ResultCursor;
32-
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3332
import org.neo4j.driver.internal.async.NetworkSession;
33+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3434
import org.neo4j.driver.internal.spi.Connection;
3535
import org.neo4j.driver.internal.util.Futures;
3636

@@ -66,8 +66,8 @@ public Result run(String query, Map<String,Object> parameters, TransactionConfig
6666
@Override
6767
public Result run(Query query, TransactionConfig config )
6868
{
69-
ResultCursor cursor = Futures.blockingGet( session.runAsync(query, config, false ),
70-
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
69+
ResultCursor cursor = Futures.blockingGet( session.runAsync( query, config ),
70+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
7171

7272
// query executed, it is safe to obtain a connection in a blocking way
7373
Connection connection = Futures.getNow( session.connectionAsync() );

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ public void close()
5757
@Override
5858
public Result run(Query query)
5959
{
60-
ResultCursor cursor = Futures.blockingGet( tx.runAsync(query, false ),
61-
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
60+
ResultCursor cursor = Futures.blockingGet( tx.runAsync( query ),
61+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
6262
return new InternalResult( tx.connection(), cursor );
6363
}
6464

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import java.util.concurrent.CompletionStage;
2424

2525
import org.neo4j.driver.AccessMode;
26+
import org.neo4j.driver.Bookmark;
2627
import org.neo4j.driver.Query;
2728
import org.neo4j.driver.TransactionConfig;
2829
import org.neo4j.driver.async.AsyncSession;
2930
import org.neo4j.driver.async.AsyncTransaction;
3031
import org.neo4j.driver.async.AsyncTransactionWork;
3132
import org.neo4j.driver.async.ResultCursor;
32-
import org.neo4j.driver.Bookmark;
3333
import org.neo4j.driver.internal.util.Futures;
3434

3535
import static java.util.Collections.emptyMap;
@@ -66,7 +66,7 @@ public CompletionStage<ResultCursor> runAsync(String query, Map<String,Object> p
6666
@Override
6767
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config )
6868
{
69-
return session.runAsync(query, config, true );
69+
return session.runAsync( query, config );
7070
}
7171

7272
@Override

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public CompletionStage<Void> rollbackAsync()
4747
@Override
4848
public CompletionStage<ResultCursor> runAsync(Query query)
4949
{
50-
return tx.runAsync(query, true );
50+
return tx.runAsync( query );
5151
}
5252

5353
public boolean isOpen()

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import org.neo4j.driver.internal.DatabaseName;
3636
import org.neo4j.driver.internal.FailableCursor;
3737
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
38-
import org.neo4j.driver.internal.cursor.RxResultCursor;
3938
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
39+
import org.neo4j.driver.internal.cursor.RxResultCursor;
4040
import org.neo4j.driver.internal.logging.PrefixedLogger;
4141
import org.neo4j.driver.internal.retry.RetryLogic;
4242
import org.neo4j.driver.internal.spi.Connection;
@@ -76,19 +76,19 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
7676
this.fetchSize = fetchSize;
7777
}
7878

79-
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config, boolean waitForRunResponse )
79+
public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig config )
8080
{
8181
CompletionStage<AsyncResultCursor> newResultCursorStage =
82-
buildResultCursorFactory(query, config, waitForRunResponse ).thenCompose( ResultCursorFactory::asyncResult );
82+
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );
8383

8484
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
85-
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
85+
return newResultCursorStage.thenCompose( AsyncResultCursor::mapSuccessfulRunCompletionAsync ).thenApply( cursor -> cursor ); // convert the return type
8686
}
8787

8888
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )
8989
{
9090
CompletionStage<RxResultCursor> newResultCursorStage =
91-
buildResultCursorFactory(query, config, true ).thenCompose( ResultCursorFactory::rxResult );
91+
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::rxResult );
9292

9393
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
9494
return newResultCursorStage;
@@ -223,24 +223,27 @@ protected CompletionStage<Boolean> currentConnectionIsOpen()
223223
connection.isOpen() ); // and it's still open
224224
}
225225

226-
private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query query, TransactionConfig config, boolean waitForRunResponse )
226+
private CompletionStage<ResultCursorFactory> buildResultCursorFactory( Query query, TransactionConfig config )
227227
{
228228
ensureSessionIsOpen();
229229

230230
return ensureNoOpenTxBeforeRunningQuery()
231231
.thenCompose( ignore -> acquireConnection( mode ) )
232-
.thenCompose( connection -> {
233-
try
234-
{
235-
ResultCursorFactory factory = connection.protocol()
236-
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, waitForRunResponse, fetchSize );
237-
return completedFuture( factory );
238-
}
239-
catch ( Throwable e )
240-
{
241-
return Futures.failedFuture( e );
242-
}
243-
} );
232+
.thenCompose(
233+
connection ->
234+
{
235+
try
236+
{
237+
ResultCursorFactory factory = connection
238+
.protocol()
239+
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, fetchSize );
240+
return completedFuture( factory );
241+
}
242+
catch ( Throwable e )
243+
{
244+
return Futures.failedFuture( e );
245+
}
246+
} );
244247
}
245248

246249
private CompletionStage<Connection> acquireConnection( AccessMode mode )

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import java.util.Arrays;
2122
import java.util.EnumSet;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
@@ -204,20 +205,20 @@ else if ( state.value == State.ROLLED_BACK )
204205
}
205206
}
206207

207-
public CompletionStage<ResultCursor> runAsync(Query query, boolean waitForRunResponse )
208+
public CompletionStage<ResultCursor> runAsync( Query query )
208209
{
209210
ensureCanRunQueries();
210211
CompletionStage<AsyncResultCursor> cursorStage =
211-
protocol.runInUnmanagedTransaction( connection, query, this, waitForRunResponse, fetchSize ).asyncResult();
212+
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
212213
resultCursors.add( cursorStage );
213-
return cursorStage.thenApply( cursor -> cursor );
214+
return cursorStage.thenCompose( AsyncResultCursor::mapSuccessfulRunCompletionAsync ).thenApply( cursor -> cursor );
214215
}
215216

216217
public CompletionStage<RxResultCursor> runRx(Query query)
217218
{
218219
ensureCanRunQueries();
219220
CompletionStage<RxResultCursor> cursorStage =
220-
protocol.runInUnmanagedTransaction( connection, query, this, false, fetchSize ).rxResult();
221+
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).rxResult();
221222
resultCursors.add( cursorStage );
222223
return cursorStage;
223224
}
@@ -229,7 +230,29 @@ public boolean isOpen()
229230

230231
public void markTerminated( Throwable cause )
231232
{
232-
state = StateHolder.terminatedWith( cause );
233+
if ( state.value == State.TERMINATED )
234+
{
235+
if ( state.causeOfTermination != null )
236+
{
237+
addSuppressedWhenNotCaptured( state.causeOfTermination, cause );
238+
}
239+
}
240+
else
241+
{
242+
state = StateHolder.terminatedWith( cause );
243+
}
244+
}
245+
246+
private void addSuppressedWhenNotCaptured( Throwable currentCause, Throwable newCause )
247+
{
248+
if ( currentCause != newCause )
249+
{
250+
boolean noneMatch = Arrays.stream( currentCause.getSuppressed() ).noneMatch( suppressed -> suppressed == newCause );
251+
if ( noneMatch )
252+
{
253+
currentCause.addSuppressed( newCause );
254+
}
255+
}
233256
}
234257

235258
public Connection connection()

driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ BookmarkHolder bookmarkHolder( Bookmark ignored )
9292
CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
9393
{
9494
return connection.protocol()
95-
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
96-
.asyncResult().thenCompose( ResultCursor::listAsync );
95+
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE )
96+
.asyncResult().thenCompose( ResultCursor::listAsync );
9797
}
9898

9999
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21-
import org.neo4j.driver.internal.FailableCursor;
21+
import java.util.concurrent.CompletableFuture;
22+
2223
import org.neo4j.driver.async.ResultCursor;
24+
import org.neo4j.driver.internal.FailableCursor;
2325

2426
public interface AsyncResultCursor extends ResultCursor, FailableCursor
2527
{
28+
CompletableFuture<AsyncResultCursor> mapSuccessfulRunCompletionAsync();
2629
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333

3434
public class AsyncResultCursorImpl implements AsyncResultCursor
3535
{
36+
private final Throwable runError;
3637
private final RunResponseHandler runHandler;
3738
private final PullAllResponseHandler pullAllHandler;
3839

39-
public AsyncResultCursorImpl(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
40+
public AsyncResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
4041
{
42+
this.runError = runError;
4143
this.runHandler = runHandler;
4244
this.pullAllHandler = pullAllHandler;
4345
}
@@ -113,13 +115,15 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
113115
@Override
114116
public CompletionStage<Throwable> discardAllFailureAsync()
115117
{
116-
return consumeAsync().handle( ( summary, error ) -> error );
118+
// runError has priority over other errors and is expected to have been reported to user by now
119+
return consumeAsync().handle( ( summary, error ) -> runError != null ? null : error );
117120
}
118121

119122
@Override
120123
public CompletionStage<Throwable> pullAllFailureAsync()
121124
{
122-
return pullAllHandler.pullAllFailureAsync();
125+
// runError has priority over other errors and is expected to have been reported to user by now
126+
return pullAllHandler.pullAllFailureAsync().thenApply( error -> runError != null ? null : error );
123127
}
124128

125129
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
@@ -154,4 +158,10 @@ else if ( record != null )
154158
}
155159
} );
156160
}
161+
162+
@Override
163+
public CompletableFuture<AsyncResultCursor> mapSuccessfulRunCompletionAsync()
164+
{
165+
return runError != null ? Futures.failedFuture( runError ) : CompletableFuture.completedFuture( this );
166+
}
157167
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionStage;
2223

2324
import org.neo4j.driver.exceptions.ClientException;
@@ -28,7 +29,6 @@
2829
import org.neo4j.driver.internal.util.Futures;
2930

3031
import static java.util.Objects.requireNonNull;
31-
import static java.util.concurrent.CompletableFuture.completedFuture;
3232

3333
/**
3434
* Used by Bolt V1, V2, V3
@@ -38,23 +38,24 @@ public class AsyncResultCursorOnlyFactory implements ResultCursorFactory
3838
protected final Connection connection;
3939
protected final Message runMessage;
4040
protected final RunResponseHandler runHandler;
41+
private final CompletableFuture<Void> runFuture;
4142
protected final PullAllResponseHandler pullAllHandler;
42-
private final boolean waitForRunResponse;
4343

44-
public AsyncResultCursorOnlyFactory(Connection connection, Message runMessage, RunResponseHandler runHandler,
45-
PullAllResponseHandler pullHandler, boolean waitForRunResponse )
44+
public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, CompletableFuture<Void> runFuture,
45+
PullAllResponseHandler pullHandler )
4646
{
4747
requireNonNull( connection );
4848
requireNonNull( runMessage );
4949
requireNonNull( runHandler );
50+
requireNonNull( runFuture );
5051
requireNonNull( pullHandler );
5152

5253
this.connection = connection;
5354
this.runMessage = runMessage;
5455
this.runHandler = runHandler;
56+
this.runFuture = runFuture;
5557

5658
this.pullAllHandler = pullHandler;
57-
this.waitForRunResponse = waitForRunResponse;
5859
}
5960

6061
public CompletionStage<AsyncResultCursor> asyncResult()
@@ -63,16 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6364
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6465
pullAllHandler.prePopulateRecords();
6566

66-
if ( waitForRunResponse )
67-
{
68-
// wait for response of RUN before proceeding
69-
return runHandler.runFuture().thenApply( ignore ->
70-
new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
71-
}
72-
else
73-
{
74-
return completedFuture( new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
75-
}
67+
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
7668
}
7769

7870
public CompletionStage<RxResultCursor> rxResult()

driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class DisposableAsyncResultCursor implements AsyncResultCursor
3636
private final AsyncResultCursor delegate;
3737
private boolean isDisposed;
3838

39-
public DisposableAsyncResultCursor(AsyncResultCursor delegate )
39+
public DisposableAsyncResultCursor( AsyncResultCursor delegate )
4040
{
4141
this.delegate = delegate;
4242
}
@@ -118,4 +118,10 @@ boolean isDisposed()
118118
{
119119
return this.isDisposed;
120120
}
121+
122+
@Override
123+
public CompletableFuture<AsyncResultCursor> mapSuccessfulRunCompletionAsync()
124+
{
125+
return this.delegate.mapSuccessfulRunCompletionAsync().thenApply( ignored -> this );
126+
}
121127
}

0 commit comments

Comments
 (0)