Skip to content

Keep bookmark returned after auto-commit tx #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal;

public interface BookmarksHolder
{
Bookmarks getBookmarks();

void setBookmarks( Bookmarks bookmarks );

BookmarksHolder NO_OP = new BookmarksHolder()
{
@Override
public Bookmarks getBookmarks()
{
return Bookmarks.empty();
}

@Override
public void setBookmarks( Bookmarks bookmarks )
{
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ private enum State
private final NetworkSession session;
private final ResultCursorsHolder resultCursors;

private volatile Bookmarks bookmarks = Bookmarks.empty();
private volatile State state = State.ACTIVE;

public ExplicitTransaction( Connection connection, NetworkSession session )
Expand Down Expand Up @@ -229,19 +228,6 @@ public void markTerminated()
state = State.TERMINATED;
}

public Bookmarks bookmark()
{
return bookmarks;
}

public void setBookmarks( Bookmarks bookmarks )
{
if ( bookmarks != null && !bookmarks.isEmpty() )
{
this.bookmarks = bookmarks;
}
}

private CompletionStage<Void> doCommitAsync()
{
if ( state == State.TERMINATED )
Expand All @@ -252,7 +238,7 @@ private CompletionStage<Void> doCommitAsync()
return protocol.commitTransaction( connection )
.thenApply( newBookmarks ->
{
setBookmarks( newBookmarks );
session.setBookmarks( newBookmarks );
return null;
} );
}
Expand Down Expand Up @@ -283,7 +269,6 @@ private void transactionClosed( State newState )
{
state = newState;
connection.release(); // release in background
session.setBookmarks( bookmarks );
}

private void terminateConnectionOnThreadInterrupt( String reason )
Expand Down
13 changes: 10 additions & 3 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

public class NetworkSession extends AbstractStatementRunner implements Session
public class NetworkSession extends AbstractStatementRunner implements Session, BookmarksHolder
{
private static final String LOG_NAME = "Session";

Expand Down Expand Up @@ -247,7 +247,14 @@ public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionS
return transactionAsync( AccessMode.WRITE, work, config );
}

void setBookmarks( Bookmarks bookmarks )
@Override
public Bookmarks getBookmarks()
{
return bookmarks;
}

@Override
public void setBookmarks( Bookmarks bookmarks )
{
if ( bookmarks != null && !bookmarks.isEmpty() )
{
Expand Down Expand Up @@ -439,7 +446,7 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection ->
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );
connection.protocol().runInAutoCommitTransaction( connection, statement, this, config, waitForRunResponse ) );

resultCursorStage = newResultCursorStage.exceptionally( error -> null );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.ServerVersion;
Expand Down Expand Up @@ -62,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
{
return connection.protocol()
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
.thenCompose( StatementResultCursor::listAsync );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler

private final Statement statement;
private final RunResponseHandler runResponseHandler;
private final MetadataExtractor metadataExtractor;
protected final MetadataExtractor metadataExtractor;
protected final Connection connection;

// initialized lazily when first record arrives
Expand Down Expand Up @@ -81,13 +81,13 @@ public synchronized void onSuccess( Map<String,Value> metadata )
finished = true;
summary = extractResultSummary( metadata );

afterSuccess();
afterSuccess( metadata );

completeRecordFuture( null );
completeFailureFuture( null );
}

protected abstract void afterSuccess();
protected abstract void afterSuccess( Map<String,Value> metadata );

@Override
public synchronized void onFailure( Throwable error )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,32 @@
*/
package org.neo4j.driver.internal.handlers;

import java.util.Map;

import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Value;

import static java.util.Objects.requireNonNull;

public class SessionPullAllResponseHandler extends PullAllResponseHandler
{
private final BookmarksHolder bookmarksHolder;

public SessionPullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
Connection connection, MetadataExtractor metadataExtractor )
Connection connection, BookmarksHolder bookmarksHolder, MetadataExtractor metadataExtractor )
{
super( statement, runResponseHandler, connection, metadataExtractor );
this.bookmarksHolder = requireNonNull( bookmarksHolder );
}

@Override
protected void afterSuccess()
protected void afterSuccess( Map<String,Value> metadata )
{
releaseConnection();
bookmarksHolder.setBookmarks( metadataExtractor.extractBookmarks( metadata ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.neo4j.driver.internal.handlers;

import java.util.Map;

import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Value;

import static java.util.Objects.requireNonNull;

Expand All @@ -37,7 +40,7 @@ public TransactionPullAllResponseHandler( Statement statement, RunResponseHandle
}

@Override
protected void afterSuccess()
protected void afterSuccess( Map<String,Value> metadata )
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.InternalStatementResultCursor;
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
Expand Down Expand Up @@ -89,15 +90,15 @@ public interface BoltProtocol
*
* @param connection the network connection to use.
* @param statement the cypher to execute.
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
* @param bookmarksHolder the bookmarksHolder that keeps track of the current bookmark and can be updated with a new bookmark.
* @param config the transaction config for the implicitly started auto-commit transaction.
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
* keys populated.
* @return stage with cursor.
*/
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse );
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse );

/**
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.InternalStatementResultCursor;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
Expand Down Expand Up @@ -142,7 +143,7 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )

@Override
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
{
// bookmarks are ignored for auto-commit transactions in this version of the protocol

Expand Down Expand Up @@ -193,7 +194,7 @@ private static PullAllResponseHandler newPullAllHandler( Statement statement, Ru
{
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
}
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
return new SessionPullAllResponseHandler( statement, runHandler, connection, BookmarksHolder.NO_OP, METADATA_EXTRACTOR );
}

private static <T> CompletionStage<T> txConfigNotSupported()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.BookmarksHolder;
import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.InternalStatementResultCursor;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
Expand Down Expand Up @@ -119,28 +120,28 @@ public CompletionStage<Void> rollbackTransaction( Connection connection )

@Override
public CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
BookmarksHolder bookmarksHolder, TransactionConfig config, boolean waitForRunResponse )
{
return runStatement( connection, statement, null, bookmarks, config, waitForRunResponse );
return runStatement( connection, statement, bookmarksHolder, null, config, waitForRunResponse );
}

@Override
public CompletionStage<InternalStatementResultCursor> runInExplicitTransaction( Connection connection, Statement statement, ExplicitTransaction tx,
boolean waitForRunResponse )
{
return runStatement( connection, statement, tx, Bookmarks.empty(), TransactionConfig.empty(), waitForRunResponse );
return runStatement( connection, statement, BookmarksHolder.NO_OP, tx, TransactionConfig.empty(), waitForRunResponse );
}

private static CompletionStage<InternalStatementResultCursor> runStatement( Connection connection, Statement statement,
ExplicitTransaction tx, Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse )
BookmarksHolder bookmarksHolder, ExplicitTransaction tx, TransactionConfig config, boolean waitForRunResponse )
{
String query = statement.text();
Map<String,Value> params = statement.parameters().asMap( ofValue() );

CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
Message runMessage = new RunWithMetadataMessage( query, params, bookmarks, config );
Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config );
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR );
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, bookmarksHolder, tx );

connection.writeAndFlush( runMessage, runHandler, PULL_ALL, pullAllHandler );

Expand All @@ -157,12 +158,12 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
}

private static PullAllResponseHandler newPullAllHandler( Statement statement, RunResponseHandler runHandler,
Connection connection, ExplicitTransaction tx )
Connection connection, BookmarksHolder bookmarksHolder, ExplicitTransaction tx )
{
if ( tx != null )
{
return new TransactionPullAllResponseHandler( statement, runHandler, connection, tx, METADATA_EXTRACTOR );
}
return new SessionPullAllResponseHandler( statement, runHandler, connection, METADATA_EXTRACTOR );
return new SessionPullAllResponseHandler( statement, runHandler, connection, bookmarksHolder, METADATA_EXTRACTOR );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.summary.InternalNotification;
import org.neo4j.driver.internal.summary.InternalPlan;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.driver.v1.summary.StatementType;

import static java.util.Collections.emptyList;
import static org.neo4j.driver.internal.types.InternalTypeSystem.TYPE_SYSTEM;

public class MetadataExtractor
{
Expand Down Expand Up @@ -89,6 +91,16 @@ public ResultSummary extractSummary( Statement statement, Connection connection,
extractNotifications( metadata ), resultAvailableAfter, extractResultConsumedAfter( metadata, resultConsumedAfterMetadataKey ) );
}

public Bookmarks extractBookmarks( Map<String,Value> metadata )
{
Value bookmarkValue = metadata.get( "bookmark" );
if ( bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType( TYPE_SYSTEM.STRING() ) )
{
return Bookmarks.from( bookmarkValue.asString() );
}
return Bookmarks.empty();
}

private static StatementType extractStatementType( Map<String,Value> metadata )
{
Value typeValue = metadata.get( "type" );
Expand Down
Loading