Skip to content

Include bookmarks in the ROUTE message. #861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CompletionStage<RoutingProcedureResponse> run( Connection connection, Dat
CompletableFuture<Map<String,Value>> completableFuture = createCompletableFuture.get();

DirectConnection directConnection = toDirectConnection( connection, databaseName );
directConnection.writeAndFlush( new RouteMessage( routingContext, databaseName.databaseName().orElse( null ) ),
directConnection.writeAndFlush( new RouteMessage( routingContext, bookmark, databaseName.databaseName().orElse( null ) ),
new RouteMessageResponseHandler( completableFuture ) );
return completableFuture
.thenApply( routingTable -> new RoutingProcedureResponse( getQuery( databaseName ), singletonList( toRecord( routingTable ) ) ) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.neo4j.driver.internal.messaging.encode;

import java.io.IOException;
import java.util.Collections;

import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageEncoder;
import org.neo4j.driver.internal.messaging.ValuePacker;
import org.neo4j.driver.internal.messaging.request.RouteMessage;

import static org.neo4j.driver.Values.value;
import static org.neo4j.driver.internal.util.Preconditions.checkArgument;

/**
Expand All @@ -37,8 +39,9 @@ public void encode( Message message, ValuePacker packer ) throws IOException
{
checkArgument( message, RouteMessage.class );
RouteMessage routeMessage = (RouteMessage) message;
packer.packStructHeader( 2, message.signature() );
packer.packStructHeader( 3, message.signature() );
packer.pack( routeMessage.getRoutingContext() );
packer.pack( routeMessage.getBookmark().isPresent() ? value( routeMessage.getBookmark().get().values() ) : value( Collections.emptyList() ) );
packer.pack( routeMessage.getDatabaseName() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.messaging.Message;

Expand All @@ -36,17 +38,20 @@ public class RouteMessage implements Message
{
public final static byte SIGNATURE = 0x66;
private final Map<String,Value> routingContext;
private final Bookmark bookmark;
private final String databaseName;

/**
* Constructor
*
* @param routingContext The routing context used to define the routing table. Multi-datacenter deployments is one of its use cases.
* @param bookmark The bookmark used when getting the routing table.
* @param databaseName The name of the database to get the routing table for.
*/
public RouteMessage( Map<String,Value> routingContext, String databaseName )
public RouteMessage( Map<String,Value> routingContext, Bookmark bookmark, String databaseName )
{
this.routingContext = unmodifiableMap( routingContext );
this.bookmark = bookmark;
this.databaseName = databaseName;
}

Expand All @@ -55,6 +60,11 @@ public Map<String,Value> getRoutingContext()
return routingContext;
}

public Optional<Bookmark> getBookmark()
{
return Optional.ofNullable( bookmark );
}

public String getDatabaseName()
{
return databaseName;
Expand All @@ -69,7 +79,7 @@ public byte signature()
@Override
public String toString()
{
return String.format( "ROUTE %s %s", routingContext, databaseName );
return String.format( "ROUTE %s %s %s", routingContext, bookmark, databaseName );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
Expand Down Expand Up @@ -90,7 +91,7 @@ void shouldRequestRoutingTableForAllValidInputScenarios( RoutingContext routingC
assertEquals( routingTable.get( "ttl" ), record.get( "ttl" ) );
assertEquals( routingTable.get( "servers" ), record.get( "servers" ) );

verifyMessageWasWrittenAndFlushed( connection, completableFuture, routingContext, databaseName );
verifyMessageWasWrittenAndFlushed( connection, completableFuture, routingContext, null, databaseName );
verify( connection ).release();
}

Expand All @@ -113,19 +114,19 @@ void shouldReturnFailureWhenSomethingHappensGettingTheRoutingTable()
assertEquals( reason, response.error() );
assertThrows( IllegalStateException.class, () -> response.records().size() );

verifyMessageWasWrittenAndFlushed( connection, completableFuture, RoutingContext.EMPTY, DatabaseNameUtil.defaultDatabase() );
verifyMessageWasWrittenAndFlushed( connection, completableFuture, RoutingContext.EMPTY, null, DatabaseNameUtil.defaultDatabase() );
verify( connection ).release();
}

private void verifyMessageWasWrittenAndFlushed( Connection connection, CompletableFuture<Map<String,Value>> completableFuture,
RoutingContext routingContext, DatabaseName databaseName )
RoutingContext routingContext, Bookmark bookmark, DatabaseName databaseName )
{
Map<String,Value> context = routingContext.toMap()
.entrySet()
.stream()
.collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) );

verify( connection ).writeAndFlush( eq( new RouteMessage( context, databaseName.databaseName().orElse( null ) ) ),
verify( connection ).writeAndFlush( eq( new RouteMessage( context, bookmark, databaseName.databaseName().orElse( null ) ) ),
eq( new RouteMessageResponseHandler( completableFuture ) ) );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.ValuePacker;
import org.neo4j.driver.internal.messaging.request.RouteMessage;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.neo4j.driver.Values.value;

class RouteMessageEncoderTest
{
Expand All @@ -51,12 +54,31 @@ void shouldEncodeRouteMessage(String databaseName) throws IOException
{
Map<String, Value> routingContext = getRoutingContext();

encoder.encode( new RouteMessage( getRoutingContext(), databaseName ), packer );
encoder.encode( new RouteMessage( getRoutingContext(), null, databaseName ), packer );

InOrder inOrder = inOrder( packer );

inOrder.verify( packer ).packStructHeader( 2, (byte) 0x66 );
inOrder.verify( packer ).packStructHeader( 3, (byte) 0x66 );
inOrder.verify( packer ).pack( routingContext );
inOrder.verify( packer ).pack( value( emptyList() ) );
inOrder.verify( packer ).pack( databaseName );
}

@ParameterizedTest
@ValueSource(strings = { "neo4j"})
@NullSource
void shouldEncodeRouteMessageWithBookmark(String databaseName) throws IOException
{
Map<String, Value> routingContext = getRoutingContext();
Bookmark bookmark = InternalBookmark.parse( "somebookmark" );

encoder.encode( new RouteMessage( getRoutingContext(), bookmark, databaseName ), packer );

InOrder inOrder = inOrder( packer );

inOrder.verify( packer ).packStructHeader( 3, (byte) 0x66 );
inOrder.verify( packer ).pack( routingContext );
inOrder.verify( packer ).pack( value( bookmark.values() ) );
inOrder.verify( packer ).pack( databaseName );
}

Expand All @@ -70,8 +92,7 @@ void shouldThrowIllegalArgumentIfMessageIsNotRouteMessage()

private Map<String,Value> getRoutingContext() {
Map<String, Value> routingContext = new HashMap<>();
routingContext.put( "ip", Values.value( "127.0.0.1" ) );
routingContext.put( "ip", value( "127.0.0.1" ) );
return routingContext;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ private RouteMessage routeMessage()
{
Map<String,Value> routeContext = new HashMap<>();
routeContext.put( "someContext", Values.value( 124 ) );
return new RouteMessage( routeContext, "dbName" );
return new RouteMessage( routeContext, InternalBookmark.empty(), "dbName" );
}
}