Skip to content

Commit 86cf222

Browse files
committed
Keep bookmark returned after auto-commit tx
Auto-commit transactions return bookmarks in Bolt V3. They were previously unused. This commit makes driver extract such bookmarks and expose them via `Session#lastBookmark()`. It then possible for explicit and auto-commit transactions within the same session to be chained with bookmarks.
1 parent fa862a6 commit 86cf222

20 files changed

+416
-67
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
public interface BookmarksHolder
22+
{
23+
Bookmarks getBookmarks();
24+
25+
void setBookmarks( Bookmarks bookmarks );
26+
27+
BookmarksHolder NO_OP = new BookmarksHolder()
28+
{
29+
@Override
30+
public Bookmarks getBookmarks()
31+
{
32+
return Bookmarks.empty();
33+
}
34+
35+
@Override
36+
public void setBookmarks( Bookmarks bookmarks )
37+
{
38+
}
39+
};
40+
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4747
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4848

49-
public class NetworkSession extends AbstractStatementRunner implements Session
49+
public class NetworkSession extends AbstractStatementRunner implements Session, BookmarksHolder
5050
{
5151
private static final String LOG_NAME = "Session";
5252

@@ -247,7 +247,14 @@ public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionS
247247
return transactionAsync( AccessMode.WRITE, work, config );
248248
}
249249

250-
void setBookmarks( Bookmarks bookmarks )
250+
@Override
251+
public Bookmarks getBookmarks()
252+
{
253+
return bookmarks;
254+
}
255+
256+
@Override
257+
public void setBookmarks( Bookmarks bookmarks )
251258
{
252259
if ( bookmarks != null && !bookmarks.isEmpty() )
253260
{
@@ -439,7 +446,7 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
439446
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
440447
.thenCompose( ignore -> acquireConnection( mode ) )
441448
.thenCompose( connection ->
442-
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );
449+
connection.protocol().runInAutoCommitTransaction( connection, statement, this, config, waitForRunResponse ) );
443450

444451
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
445452

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.CompletionException;
2323
import java.util.concurrent.CompletionStage;
2424

25-
import org.neo4j.driver.internal.Bookmarks;
25+
import org.neo4j.driver.internal.BookmarksHolder;
2626
import org.neo4j.driver.internal.spi.Connection;
2727
import org.neo4j.driver.internal.util.Futures;
2828
import org.neo4j.driver.internal.util.ServerVersion;
@@ -62,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6262
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
6363
{
6464
return connection.protocol()
65-
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
65+
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
6666
.thenCompose( StatementResultCursor::listAsync );
6767
}
6868

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5353

5454
private final Statement statement;
5555
private final RunResponseHandler runResponseHandler;
56-
private final MetadataExtractor metadataExtractor;
56+
protected final MetadataExtractor metadataExtractor;
5757
protected final Connection connection;
5858

5959
// initialized lazily when first record arrives
@@ -81,13 +81,13 @@ public synchronized void onSuccess( Map<String,Value> metadata )
8181
finished = true;
8282
summary = extractResultSummary( metadata );
8383

84-
afterSuccess();
84+
afterSuccess( metadata );
8585

8686
completeRecordFuture( null );
8787
completeFailureFuture( null );
8888
}
8989

90-
protected abstract void afterSuccess();
90+
protected abstract void afterSuccess( Map<String,Value> metadata );
9191

9292
@Override
9393
public synchronized void onFailure( Throwable error )

driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,32 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import java.util.Map;
22+
23+
import org.neo4j.driver.internal.BookmarksHolder;
2124
import org.neo4j.driver.internal.spi.Connection;
2225
import org.neo4j.driver.internal.util.MetadataExtractor;
2326
import org.neo4j.driver.v1.Statement;
27+
import org.neo4j.driver.v1.Value;
28+
29+
import static java.util.Objects.requireNonNull;
2430

2531
public class SessionPullAllResponseHandler extends PullAllResponseHandler
2632
{
33+
private final BookmarksHolder bookmarksHolder;
34+
2735
public SessionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
28-
Connection connection, MetadataExtractor metadataExtractor )
36+
Connection connection, BookmarksHolder bookmarksHolder, MetadataExtractor metadataExtractor )
2937
{
3038
super( statement, runResponseHandler, connection, metadataExtractor );
39+
this.bookmarksHolder = requireNonNull( bookmarksHolder );
3140
}
3241

3342
@Override
34-
protected void afterSuccess()
43+
protected void afterSuccess( Map<String,Value> metadata )
3544
{
3645
releaseConnection();
46+
bookmarksHolder.setBookmarks( metadataExtractor.extractBookmarks( metadata ) );
3747
}
3848

3949
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import java.util.Map;
22+
2123
import org.neo4j.driver.internal.ExplicitTransaction;
2224
import org.neo4j.driver.internal.spi.Connection;
2325
import org.neo4j.driver.internal.util.MetadataExtractor;
2426
import org.neo4j.driver.v1.Statement;
27+
import org.neo4j.driver.v1.Value;
2528

2629
import static java.util.Objects.requireNonNull;
2730

@@ -37,7 +40,7 @@ public TransactionPullAllResponseHandler( Statement statement, RunResponseHandle
3740
}
3841

3942
@Override
40-
protected void afterSuccess()
43+
protected void afterSuccess( Map<String,Value> metadata )
4144
{
4245
}
4346

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletionStage;
2626

2727
import org.neo4j.driver.internal.Bookmarks;
28+
import org.neo4j.driver.internal.BookmarksHolder;
2829
import org.neo4j.driver.internal.ExplicitTransaction;
2930
import org.neo4j.driver.internal.InternalStatementResultCursor;
3031
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
@@ -89,15 +90,15 @@ public interface BoltProtocol
8990
*
9091
* @param connection the network connection to use.
9192
* @param statement the cypher to execute.
92-
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
93+
* @param bookmarksHolder the bookmarksHolder that keeps track of the current bookmark and can be updated with a new bookmark.
9394
* @param config the transaction config for the implicitly started auto-commit transaction.
9495
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
9596
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
9697
* keys populated.
9798
* @return stage with cursor.
9899
*/
99100
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
100-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse );
101+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse );
101102

102103
/**
103104
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionStage;
2727

2828
import org.neo4j.driver.internal.Bookmarks;
29+
import org.neo4j.driver.internal.BookmarksHolder;
2930
import org.neo4j.driver.internal.ExplicitTransaction;
3031
import org.neo4j.driver.internal.InternalStatementResultCursor;
3132
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -142,7 +143,7 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )
142143

143144
@Override
144145
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
145-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
146+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
146147
{
147148
// bookmarks are ignored for auto-commit transactions in this version of the protocol
148149

@@ -193,7 +194,7 @@ private static PullAllResponseHandler newPullAllHandler( Statement statement, Ru
193194
{
194195
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
195196
}
196-
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
197+
return new SessionPullAllResponseHandler( statement, runHandler, connection, BookmarksHolder.NO_OP, METADATA_EXTRACTOR );
197198
}
198199

199200
private static <T> CompletionStage<T> txConfigNotSupported()

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionStage;
2727

2828
import org.neo4j.driver.internal.Bookmarks;
29+
import org.neo4j.driver.internal.BookmarksHolder;
2930
import org.neo4j.driver.internal.ExplicitTransaction;
3031
import org.neo4j.driver.internal.InternalStatementResultCursor;
3132
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -119,28 +120,28 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )
119120

120121
@Override
121122
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
122-
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
123+
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
123124
{
124-
return runStatement( connection, statement, null, bookmarks, config, waitForRunResponse );
125+
return runStatement( connection, statement, bookmarksHolder, null, config, waitForRunResponse );
125126
}
126127

127128
@Override
128129
public CompletionStage<InternalStatementResultCursor> runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
129130
boolean waitForRunResponse )
130131
{
131-
return runStatement( connection, statement, tx, Bookmarks.empty(), TransactionConfig.empty(), waitForRunResponse );
132+
return runStatement( connection, statement, BookmarksHolder.NO_OP, tx, TransactionConfig.empty(), waitForRunResponse );
132133
}
133134

134135
private static CompletionStage<InternalStatementResultCursor> runStatement( Connection connection, Statement statement,
135-
ExplicitTransaction tx, Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
136+
BookmarksHolder bookmarksHolder, ExplicitTransaction tx, TransactionConfig config, boolean waitForRunResponse )
136137
{
137138
String query = statement.text();
138139
Map<String,Value> params = statement.parameters().asMap( ofValue() );
139140

140141
CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
141-
Message runMessage = new RunWithMetadataMessage( query, params, bookmarks, config );
142+
Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config );
142143
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR );
143-
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
144+
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, bookmarksHolder, tx );
144145

145146
connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );
146147

@@ -157,12 +158,12 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
157158
}
158159

159160
private static PullAllResponseHandler newPullAllHandler( Statement statement, RunResponseHandler runHandler,
160-
Connection connection, ExplicitTransaction tx )
161+
Connection connection, BookmarksHolder bookmarksHolder, ExplicitTransaction tx )
161162
{
162163
if ( tx != null )
163164
{
164165
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
165166
}
166-
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
167+
return new SessionPullAllResponseHandler( statement, runHandler, connection, bookmarksHolder, METADATA_EXTRACTOR );
167168
}
168169
}

driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525

26+
import org.neo4j.driver.internal.Bookmarks;
2627
import org.neo4j.driver.internal.spi.Connection;
2728
import org.neo4j.driver.internal.summary.InternalNotification;
2829
import org.neo4j.driver.internal.summary.InternalPlan;
@@ -40,6 +41,7 @@
4041
import org.neo4j.driver.v1.summary.StatementType;
4142

4243
import static java.util.Collections.emptyList;
44+
import static org.neo4j.driver.internal.types.InternalTypeSystem.TYPE_SYSTEM;
4345

4446
public class MetadataExtractor
4547
{
@@ -89,6 +91,16 @@ public ResultSummary extractSummary( Statement statement, Connection connection,
8991
extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata, resultConsumedAfterMetadataKey ) );
9092
}
9193

94+
public Bookmarks extractBookmarks( Map<String,Value> metadata )
95+
{
96+
Value bookmarkValue = metadata.get( "bookmark" );
97+
if ( bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType( TYPE_SYSTEM.STRING() ) )
98+
{
99+
return Bookmarks.from( bookmarkValue.asString() );
100+
}
101+
return Bookmarks.empty();
102+
}
103+
92104
private static StatementType extractStatementType( Map<String,Value> metadata )
93105
{
94106
Value typeValue = metadata.get( "type" );

driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,8 @@ private StatementResult createResult( int numberOfRecords )
356356
Connection connection = mock( Connection.class );
357357
when( connection.serverAddress() ).thenReturn( LOCAL_DEFAULT );
358358
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
359-
PullAllResponseHandler pullAllHandler = new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
359+
PullAllResponseHandler pullAllHandler =
360+
new SessionPullAllResponseHandler( statement, runHandler, connection, BookmarksHolder.NO_OP, METADATA_EXTRACTOR );
360361

361362
for ( int i = 1; i <= numberOfRecords; i++ )
362363
{

driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,37 @@ void shouldAllowStartingTransactionAfterCurrentOneIsClosed()
652652
assertNotNull( session.beginTransaction() );
653653
}
654654

655+
@Test
656+
void shouldAllowToGetAndSetBookmarks()
657+
{
658+
NetworkSession session = newSession( connectionProvider, READ );
659+
assertEquals( Bookmarks.empty(), session.getBookmarks() );
660+
661+
session.setBookmarks( null );
662+
assertEquals( Bookmarks.empty(), session.getBookmarks() );
663+
664+
session.setBookmarks( Bookmarks.empty() );
665+
assertEquals( Bookmarks.empty(), session.getBookmarks() );
666+
667+
Bookmarks bookmarks1 = Bookmarks.from( "neo4j:bookmark:v1:tx1" );
668+
session.setBookmarks( bookmarks1 );
669+
assertEquals( bookmarks1, session.getBookmarks() );
670+
671+
session.setBookmarks( null );
672+
assertEquals( bookmarks1, session.getBookmarks() );
673+
674+
session.setBookmarks( Bookmarks.empty() );
675+
assertEquals( bookmarks1, session.getBookmarks() );
676+
677+
Bookmarks bookmarks2 = Bookmarks.from( "neo4j:bookmark:v1:tx2" );
678+
session.setBookmarks( bookmarks2 );
679+
assertEquals( bookmarks2, session.getBookmarks() );
680+
681+
Bookmarks bookmarks3 = Bookmarks.from( "neo4j:bookmark:v1:tx42" );
682+
session.setBookmarks( bookmarks3 );
683+
assertEquals( bookmarks3, session.getBookmarks() );
684+
}
685+
655686
private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode )
656687
{
657688
NetworkSession session = newSession( connectionProvider, sessionMode );

0 commit comments

Comments
 (0)