Skip to content

Commit cacdc55

Browse files
committed
Make run methods wait for successful request acknowledgement
This update makes `run` methods (session, tx) wait for successful request acknowledgement by the server. Specifically, the `Result` object will only be returned when there is a successful response from the server to the `RUN` message, meaning that the request was at least accepted by the server for processing. Otherwise, an appropriate error will be thrown. Additionally, this update makes the `Result.keys` method non-blocking and free from potential communication errors.
1 parent 463195a commit cacdc55

File tree

51 files changed

+696
-900
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+696
-900
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalResult.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public class InternalResult implements Result
3939
{
4040
private final Connection connection;
4141
private final ResultCursor cursor;
42-
private List<String> keys;
4342

4443
public InternalResult(Connection connection, ResultCursor cursor )
4544
{
@@ -50,12 +49,7 @@ public InternalResult(Connection connection, ResultCursor cursor )
5049
@Override
5150
public List<String> keys()
5251
{
53-
if ( keys == null )
54-
{
55-
blockingGet( cursor.peekAsync() );
56-
keys = cursor.keys();
57-
}
58-
return keys;
52+
return cursor.keys();
5953
}
6054

6155
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
import org.neo4j.driver.AccessMode;
2424
import org.neo4j.driver.Bookmark;
2525
import org.neo4j.driver.Query;
26-
import org.neo4j.driver.Session;
2726
import org.neo4j.driver.Result;
27+
import org.neo4j.driver.Session;
2828
import org.neo4j.driver.Transaction;
2929
import org.neo4j.driver.TransactionConfig;
3030
import org.neo4j.driver.TransactionWork;
3131
import org.neo4j.driver.async.ResultCursor;
32-
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3332
import org.neo4j.driver.internal.async.NetworkSession;
33+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3434
import org.neo4j.driver.internal.spi.Connection;
3535
import org.neo4j.driver.internal.util.Futures;
3636

@@ -66,8 +66,8 @@ public Result run(String query, Map<String,Object> parameters, TransactionConfig
6666
@Override
6767
public Result run(Query query, TransactionConfig config )
6868
{
69-
ResultCursor cursor = Futures.blockingGet( session.runAsync(query, config, false ),
70-
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
69+
ResultCursor cursor = Futures.blockingGet( session.runAsync( query, config ),
70+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
7171

7272
// query executed, it is safe to obtain a connection in a blocking way
7373
Connection connection = Futures.getNow( session.connectionAsync() );

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ public void close()
5757
@Override
5858
public Result run(Query query)
5959
{
60-
ResultCursor cursor = Futures.blockingGet( tx.runAsync(query, false ),
61-
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
60+
ResultCursor cursor = Futures.blockingGet( tx.runAsync( query ),
61+
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
6262
return new InternalResult( tx.connection(), cursor );
6363
}
6464

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import java.util.concurrent.CompletionStage;
2424

2525
import org.neo4j.driver.AccessMode;
26+
import org.neo4j.driver.Bookmark;
2627
import org.neo4j.driver.Query;
2728
import org.neo4j.driver.TransactionConfig;
2829
import org.neo4j.driver.async.AsyncSession;
2930
import org.neo4j.driver.async.AsyncTransaction;
3031
import org.neo4j.driver.async.AsyncTransactionWork;
3132
import org.neo4j.driver.async.ResultCursor;
32-
import org.neo4j.driver.Bookmark;
3333
import org.neo4j.driver.internal.util.Futures;
3434

3535
import static java.util.Collections.emptyMap;
@@ -66,7 +66,7 @@ public CompletionStage<ResultCursor> runAsync(String query, Map<String,Object> p
6666
@Override
6767
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config )
6868
{
69-
return session.runAsync(query, config, true );
69+
return session.runAsync( query, config );
7070
}
7171

7272
@Override

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public CompletionStage<Void> rollbackAsync()
4747
@Override
4848
public CompletionStage<ResultCursor> runAsync(Query query)
4949
{
50-
return tx.runAsync(query, true );
50+
return tx.runAsync( query );
5151
}
5252

5353
public boolean isOpen()

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import org.neo4j.driver.internal.DatabaseName;
3636
import org.neo4j.driver.internal.FailableCursor;
3737
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
38-
import org.neo4j.driver.internal.cursor.RxResultCursor;
3938
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
39+
import org.neo4j.driver.internal.cursor.RxResultCursor;
4040
import org.neo4j.driver.internal.logging.PrefixedLogger;
4141
import org.neo4j.driver.internal.retry.RetryLogic;
4242
import org.neo4j.driver.internal.spi.Connection;
@@ -76,10 +76,10 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
7676
this.fetchSize = fetchSize;
7777
}
7878

79-
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config, boolean waitForRunResponse )
79+
public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig config )
8080
{
8181
CompletionStage<AsyncResultCursor> newResultCursorStage =
82-
buildResultCursorFactory(query, config, waitForRunResponse ).thenCompose( ResultCursorFactory::asyncResult );
82+
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );
8383

8484
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
8585
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
@@ -88,7 +88,7 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
8888
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )
8989
{
9090
CompletionStage<RxResultCursor> newResultCursorStage =
91-
buildResultCursorFactory(query, config, true ).thenCompose( ResultCursorFactory::rxResult );
91+
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::rxResult );
9292

9393
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
9494
return newResultCursorStage;
@@ -223,24 +223,27 @@ protected CompletionStage<Boolean> currentConnectionIsOpen()
223223
connection.isOpen() ); // and it's still open
224224
}
225225

226-
private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query query, TransactionConfig config, boolean waitForRunResponse )
226+
private CompletionStage<ResultCursorFactory> buildResultCursorFactory( Query query, TransactionConfig config )
227227
{
228228
ensureSessionIsOpen();
229229

230230
return ensureNoOpenTxBeforeRunningQuery()
231231
.thenCompose( ignore -> acquireConnection( mode ) )
232-
.thenCompose( connection -> {
233-
try
234-
{
235-
ResultCursorFactory factory = connection.protocol()
236-
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, waitForRunResponse, fetchSize );
237-
return completedFuture( factory );
238-
}
239-
catch ( Throwable e )
240-
{
241-
return Futures.failedFuture( e );
242-
}
243-
} );
232+
.thenCompose(
233+
connection ->
234+
{
235+
try
236+
{
237+
ResultCursorFactory factory = connection
238+
.protocol()
239+
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, fetchSize );
240+
return completedFuture( factory );
241+
}
242+
catch ( Throwable e )
243+
{
244+
return Futures.failedFuture( e );
245+
}
246+
} );
244247
}
245248

246249
private CompletionStage<Connection> acquireConnection( AccessMode mode )

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ else if ( state.value == State.ROLLED_BACK )
204204
}
205205
}
206206

207-
public CompletionStage<ResultCursor> runAsync(Query query, boolean waitForRunResponse )
207+
public CompletionStage<ResultCursor> runAsync( Query query )
208208
{
209209
ensureCanRunQueries();
210210
CompletionStage<AsyncResultCursor> cursorStage =
211-
protocol.runInUnmanagedTransaction( connection, query, this, waitForRunResponse, fetchSize ).asyncResult();
211+
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
212212
resultCursors.add( cursorStage );
213213
return cursorStage.thenApply( cursor -> cursor );
214214
}
@@ -217,7 +217,7 @@ public CompletionStage<RxResultCursor> runRx(Query query)
217217
{
218218
ensureCanRunQueries();
219219
CompletionStage<RxResultCursor> cursorStage =
220-
protocol.runInUnmanagedTransaction( connection, query, this, false, fetchSize ).rxResult();
220+
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).rxResult();
221221
resultCursors.add( cursorStage );
222222
return cursorStage;
223223
}

driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ BookmarkHolder bookmarkHolder( Bookmark ignored )
9292
CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
9393
{
9494
return connection.protocol()
95-
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
96-
.asyncResult().thenCompose( ResultCursor::listAsync );
95+
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE )
96+
.asyncResult().thenCompose( ResultCursor::listAsync );
9797
}
9898

9999
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionStage;
2223

2324
import org.neo4j.driver.exceptions.ClientException;
@@ -28,7 +29,6 @@
2829
import org.neo4j.driver.internal.util.Futures;
2930

3031
import static java.util.Objects.requireNonNull;
31-
import static java.util.concurrent.CompletableFuture.completedFuture;
3232

3333
/**
3434
* Used by Bolt V1, V2, V3
@@ -38,23 +38,24 @@ public class AsyncResultCursorOnlyFactory implements ResultCursorFactory
3838
protected final Connection connection;
3939
protected final Message runMessage;
4040
protected final RunResponseHandler runHandler;
41+
private final CompletableFuture<Void> runFuture;
4142
protected final PullAllResponseHandler pullAllHandler;
42-
private final boolean waitForRunResponse;
4343

44-
public AsyncResultCursorOnlyFactory(Connection connection, Message runMessage, RunResponseHandler runHandler,
45-
PullAllResponseHandler pullHandler, boolean waitForRunResponse )
44+
public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, CompletableFuture<Void> runFuture,
45+
PullAllResponseHandler pullHandler )
4646
{
4747
requireNonNull( connection );
4848
requireNonNull( runMessage );
4949
requireNonNull( runHandler );
50+
requireNonNull( runFuture );
5051
requireNonNull( pullHandler );
5152

5253
this.connection = connection;
5354
this.runMessage = runMessage;
5455
this.runHandler = runHandler;
56+
this.runFuture = runFuture;
5557

5658
this.pullAllHandler = pullHandler;
57-
this.waitForRunResponse = waitForRunResponse;
5859
}
5960

6061
public CompletionStage<AsyncResultCursor> asyncResult()
@@ -63,16 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6364
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6465
pullAllHandler.prePopulateRecords();
6566

66-
if ( waitForRunResponse )
67-
{
68-
// wait for response of RUN before proceeding
69-
return runHandler.runFuture().thenApply( ignore ->
70-
new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
71-
}
72-
else
73-
{
74-
return completedFuture( new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
75-
}
67+
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
7668
}
7769

7870
public CompletionStage<RxResultCursor> rxResult()

driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionStage;
2223

2324
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
@@ -27,7 +28,6 @@
2728
import org.neo4j.driver.internal.spi.Connection;
2829

2930
import static java.util.Objects.requireNonNull;
30-
import static java.util.concurrent.CompletableFuture.completedFuture;
3131

3232
/**
3333
* Bolt V4
@@ -39,24 +39,25 @@ public class ResultCursorFactoryImpl implements ResultCursorFactory
3939

4040
private final PullResponseHandler pullHandler;
4141
private final PullAllResponseHandler pullAllHandler;
42-
private final boolean waitForRunResponse;
4342
private final Message runMessage;
43+
private final CompletableFuture<Void> runFuture;
4444

45-
public ResultCursorFactoryImpl(Connection connection, Message runMessage, RunResponseHandler runHandler, PullResponseHandler pullHandler,
46-
PullAllResponseHandler pullAllHandler, boolean waitForRunResponse )
45+
public ResultCursorFactoryImpl( Connection connection, Message runMessage, RunResponseHandler runHandler, CompletableFuture<Void> runFuture,
46+
PullResponseHandler pullHandler, PullAllResponseHandler pullAllHandler )
4747
{
4848
requireNonNull( connection );
4949
requireNonNull( runMessage );
5050
requireNonNull( runHandler );
51+
requireNonNull( runFuture );
5152
requireNonNull( pullHandler );
5253
requireNonNull( pullAllHandler );
5354

5455
this.connection = connection;
5556
this.runMessage = runMessage;
5657
this.runHandler = runHandler;
58+
this.runFuture = runFuture;
5759
this.pullHandler = pullHandler;
5860
this.pullAllHandler = pullAllHandler;
59-
this.waitForRunResponse = waitForRunResponse;
6061
}
6162

6263
@Override
@@ -65,29 +66,13 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6566
// only write and flush messages when async result is wanted.
6667
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6768
pullAllHandler.prePopulateRecords();
68-
69-
if ( waitForRunResponse )
70-
{
71-
// wait for response of RUN before proceeding
72-
return runHandler.runFuture().thenApply(
73-
ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
74-
}
75-
else
76-
{
77-
return completedFuture( new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
78-
}
69+
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
7970
}
8071

8172
@Override
8273
public CompletionStage<RxResultCursor> rxResult()
8374
{
8475
connection.writeAndFlush( runMessage, runHandler );
85-
// we always wait for run reply
86-
return runHandler.runFuture().thenApply( this::composeRxCursor );
87-
}
88-
89-
private RxResultCursor composeRxCursor(Throwable runError )
90-
{
91-
return new RxResultCursorImpl( runError, runHandler, pullHandler );
76+
return runFuture.handle( ( ignored, error ) -> new RxResultCursorImpl( error, runHandler, pullHandler ) );
9277
}
9378
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, Pul
5454
{
5555
Objects.requireNonNull( runHandler );
5656
Objects.requireNonNull( pullHandler );
57-
assertRunResponseArrived( runHandler );
5857

5958
this.runResponseError = runError;
6059
this.runHandler = runHandler;
@@ -160,14 +159,6 @@ else if ( summary != null )
160159
} );
161160
}
162161

163-
private void assertRunResponseArrived( RunResponseHandler runHandler )
164-
{
165-
if ( !runHandler.runFuture().isDone() )
166-
{
167-
throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
168-
}
169-
}
170-
171162
enum RecordConsumerStatus
172163
{
173164
NOT_INSTALLED( false, false ),

0 commit comments

Comments
 (0)