Skip to content

Commit 9f5a32d

Browse files
authored
Merge pull request #527 from lutovich/1.7-bookmark-after-auto-commit-tx
Keep bookmark returned after auto-commit tx
2 parents fa862a6 + 4f59cc6 commit 9f5a32d

22 files changed

+434
-133
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
public interface BookmarksHolder
22+
{
23+
Bookmarks getBookmarks();
24+
25+
void setBookmarks( Bookmarks bookmarks );
26+
27+
BookmarksHolder NO_OP = new BookmarksHolder()
28+
{
29+
@Override
30+
public Bookmarks getBookmarks()
31+
{
32+
return Bookmarks.empty();
33+
}
34+
35+
@Override
36+
public void setBookmarks( Bookmarks bookmarks )
37+
{
38+
}
39+
};
40+
}

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ private enum State
6868
private final NetworkSession session;
6969
private final ResultCursorsHolder resultCursors;
7070

71-
private volatile Bookmarks bookmarks = Bookmarks.empty();
7271
private volatile State state = State.ACTIVE;
7372

7473
public ExplicitTransaction( Connection connection, NetworkSession session )
@@ -229,19 +228,6 @@ public void markTerminated()
229228
state = State.TERMINATED;
230229
}
231230

232-
public Bookmarks bookmark()
233-
{
234-
return bookmarks;
235-
}
236-
237-
public void setBookmarks( Bookmarks bookmarks )
238-
{
239-
if ( bookmarks != null && !bookmarks.isEmpty() )
240-
{
241-
this.bookmarks = bookmarks;
242-
}
243-
}
244-
245231
private CompletionStage<Void> doCommitAsync()
246232
{
247233
if ( state == State.TERMINATED )
@@ -252,7 +238,7 @@ private CompletionStage<Void> doCommitAsync()
252238
return protocol.commitTransaction( connection )
253239
.thenApply( newBookmarks ->
254240
{
255-
setBookmarks( newBookmarks );
241+
session.setBookmarks( newBookmarks );
256242
return null;
257243
} );
258244
}
@@ -283,7 +269,6 @@ private void transactionClosed( State newState )
283269
{
284270
state = newState;
285271
connection.release(); // release in background
286-
session.setBookmarks( bookmarks );
287272
}
288273

289274
private void terminateConnectionOnThreadInterrupt( String reason )

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4747
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4848

49-
public class NetworkSession extends AbstractStatementRunner implements Session
49+
public class NetworkSession extends AbstractStatementRunner implements Session, BookmarksHolder
5050
{
5151
private static final String LOG_NAME = "Session";
5252

@@ -247,7 +247,14 @@ public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionS
247247
return transactionAsync( AccessMode.WRITE, work, config );
248248
}
249249

250-
void setBookmarks( Bookmarks bookmarks )
250+
@Override
251+
public Bookmarks getBookmarks()
252+
{
253+
return bookmarks;
254+
}
255+
256+
@Override
257+
public void setBookmarks( Bookmarks bookmarks )
251258
{
252259
if ( bookmarks != null && !bookmarks.isEmpty() )
253260
{
@@ -439,7 +446,7 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
439446
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
440447
.thenCompose( ignore -> acquireConnection( mode ) )
441448
.thenCompose( connection ->
442-
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );
449+
connection.protocol().runInAutoCommitTransaction( connection, statement, this, config, waitForRunResponse ) );
443450

444451
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
445452

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.CompletionException;
2323
import java.util.concurrent.CompletionStage;
2424

25-
import org.neo4j.driver.internal.Bookmarks;
25+
import org.neo4j.driver.internal.BookmarksHolder;
2626
import org.neo4j.driver.internal.spi.Connection;
2727
import org.neo4j.driver.internal.util.Futures;
2828
import org.neo4j.driver.internal.util.ServerVersion;
@@ -62,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6262
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
6363
{
6464
return connection.protocol()
65-
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
65+
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
6666
.thenCompose( StatementResultCursor::listAsync );
6767
}
6868

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5353

5454
private final Statement statement;
5555
private final RunResponseHandler runResponseHandler;
56-
private final MetadataExtractor metadataExtractor;
56+
protected final MetadataExtractor metadataExtractor;
5757
protected final Connection connection;
5858

5959
// initialized lazily when first record arrives
@@ -81,13 +81,13 @@ public synchronized void onSuccess( Map<String,Value> metadata )
8181
finished = true;
8282
summary = extractResultSummary( metadata );
8383

84-
afterSuccess();
84+
afterSuccess( metadata );
8585

8686
completeRecordFuture( null );
8787
completeFailureFuture( null );
8888
}
8989

90-
protected abstract void afterSuccess();
90+
protected abstract void afterSuccess( Map<String,Value> metadata );
9191

9292
@Override
9393
public synchronized void onFailure( Throwable error )

driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,32 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import java.util.Map;
22+
23+
import org.neo4j.driver.internal.BookmarksHolder;
2124
import org.neo4j.driver.internal.spi.Connection;
2225
import org.neo4j.driver.internal.util.MetadataExtractor;
2326
import org.neo4j.driver.v1.Statement;
27+
import org.neo4j.driver.v1.Value;
28+
29+
import static java.util.Objects.requireNonNull;
2430

2531
public class SessionPullAllResponseHandler extends PullAllResponseHandler
2632
{
33+
private final BookmarksHolder bookmarksHolder;
34+
2735
public SessionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
28-
Connection connection, MetadataExtractor metadataExtractor )
36+
Connection connection, BookmarksHolder bookmarksHolder, MetadataExtractor metadataExtractor )
2937
{
3038
super( statement, runResponseHandler, connection, metadataExtractor );
39+
this.bookmarksHolder = requireNonNull( bookmarksHolder );
3140
}
3241

3342
@Override
34-
protected void afterSuccess()
43+
protected void afterSuccess( Map<String,Value> metadata )
3544
{
3645
releaseConnection();
46+
bookmarksHolder.setBookmarks( metadataExtractor.extractBookmarks( metadata ) );
3747
}
3848

3949
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import java.util.Map;
22+
2123
import org.neo4j.driver.internal.ExplicitTransaction;
2224
import org.neo4j.driver.internal.spi.Connection;
2325
import org.neo4j.driver.internal.util.MetadataExtractor;
2426
import org.neo4j.driver.v1.Statement;
27+
import org.neo4j.driver.v1.Value;
2528

2629
import static java.util.Objects.requireNonNull;
2730

@@ -37,7 +40,7 @@ public TransactionPullAllResponseHandler( Statement statement, RunResponseHandle
3740
}
3841

3942
@Override
40-
protected void afterSuccess()
43+
protected void afterSuccess( Map<String,Value> metadata )
4144
{
4245
}
4346

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletionStage;
2626

2727
import org.neo4j.driver.internal.Bookmarks;
28+
import org.neo4j.driver.internal.BookmarksHolder;
2829
import org.neo4j.driver.internal.ExplicitTransaction;
2930
import org.neo4j.driver.internal.InternalStatementResultCursor;
3031
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
@@ -89,15 +90,15 @@ public interface BoltProtocol
8990
*
9091
* @param connection the network connection to use.
9192
* @param statement the cypher to execute.
92-
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
93+
* @param bookmarksHolder the bookmarksHolder that keeps track of the current bookmark and can be updated with a new bookmark.
9394
* @param config the transaction config for the implicitly started auto-commit transaction.
9495
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
9596
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
9697
* keys populated.
9798
* @return stage with cursor.
9899
*/
99100
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
100-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse );
101+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse );
101102

102103
/**
103104
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionStage;
2727

2828
import org.neo4j.driver.internal.Bookmarks;
29+
import org.neo4j.driver.internal.BookmarksHolder;
2930
import org.neo4j.driver.internal.ExplicitTransaction;
3031
import org.neo4j.driver.internal.InternalStatementResultCursor;
3132
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -142,7 +143,7 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )
142143

143144
@Override
144145
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
145-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
146+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
146147
{
147148
// bookmarks are ignored for auto-commit transactions in this version of the protocol
148149

@@ -193,7 +194,7 @@ private static PullAllResponseHandler newPullAllHandler( Statement statement, Ru
193194
{
194195
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
195196
}
196-
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
197+
return new SessionPullAllResponseHandler( statement, runHandler, connection, BookmarksHolder.NO_OP, METADATA_EXTRACTOR );
197198
}
198199

199200
private static <T> CompletionStage<T> txConfigNotSupported()

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionStage;
2727

2828
import org.neo4j.driver.internal.Bookmarks;
29+
import org.neo4j.driver.internal.BookmarksHolder;
2930
import org.neo4j.driver.internal.ExplicitTransaction;
3031
import org.neo4j.driver.internal.InternalStatementResultCursor;
3132
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -119,28 +120,28 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )
119120

120121
@Override
121122
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
122-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
123+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
123124
{
124-
return runStatement( connection, statement, null, bookmarks, config, waitForRunResponse );
125+
return runStatement( connection, statement, bookmarksHolder, null, config, waitForRunResponse );
125126
}
126127

127128
@Override
128129
public CompletionStage<InternalStatementResultCursor> runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
129130
boolean waitForRunResponse )
130131
{
131-
return runStatement( connection, statement, tx, Bookmarks.empty(), TransactionConfig.empty(), waitForRunResponse );
132+
return runStatement( connection, statement, BookmarksHolder.NO_OP, tx, TransactionConfig.empty(), waitForRunResponse );
132133
}
133134

134135
private static CompletionStage<InternalStatementResultCursor> runStatement( Connection connection, Statement statement,
135-
ExplicitTransaction tx, Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
136+
BookmarksHolder bookmarksHolder, ExplicitTransaction tx, TransactionConfig config, boolean waitForRunResponse )
136137
{
137138
String query = statement.text();
138139
Map<String,Value> params = statement.parameters().asMap( ofValue() );
139140

140141
CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
141-
Message runMessage = new RunWithMetadataMessage( query, params, bookmarks, config );
142+
Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config );
142143
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR );
143-
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
144+
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, bookmarksHolder, tx );
144145

145146
connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );
146147

@@ -157,12 +158,12 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
157158
}
158159

159160
private static PullAllResponseHandler newPullAllHandler( Statement statement, RunResponseHandler runHandler,
160-
Connection connection, ExplicitTransaction tx )
161+
Connection connection, BookmarksHolder bookmarksHolder, ExplicitTransaction tx )
161162
{
162163
if ( tx != null )
163164
{
164165
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
165166
}
166-
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
167+
return new SessionPullAllResponseHandler( statement, runHandler, connection, bookmarksHolder, METADATA_EXTRACTOR );
167168
}
168169
}

driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525

26+
import org.neo4j.driver.internal.Bookmarks;
2627
import org.neo4j.driver.internal.spi.Connection;
2728
import org.neo4j.driver.internal.summary.InternalNotification;
2829
import org.neo4j.driver.internal.summary.InternalPlan;
@@ -40,6 +41,7 @@
4041
import org.neo4j.driver.v1.summary.StatementType;
4142

4243
import static java.util.Collections.emptyList;
44+
import static org.neo4j.driver.internal.types.InternalTypeSystem.TYPE_SYSTEM;
4345

4446
public class MetadataExtractor
4547
{
@@ -89,6 +91,16 @@ public ResultSummary extractSummary( Statement statement, Connection connection,
8991
extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata, resultConsumedAfterMetadataKey ) );
9092
}
9193

94+
public Bookmarks extractBookmarks( Map<String,Value> metadata )
95+
{
96+
Value bookmarkValue = metadata.get( "bookmark" );
97+
if ( bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType( TYPE_SYSTEM.STRING() ) )
98+
{
99+
return Bookmarks.from( bookmarkValue.asString() );
100+
}
101+
return Bookmarks.empty();
102+
}
103+
92104
private static StatementType extractStatementType( Map<String,Value> metadata )
93105
{
94106
Value typeValue = metadata.get( "type" );

0 commit comments

Comments
 (0)