Skip to content

Commit 034cb07

Browse files
4.1 add routing context to hello message (#701)
* Adding routing context into HELLO message so that server can make routing decisions * Minor polishing. * Use AuthToken instead of Map in BoltProtocol * change RoutingContext.isDefined() to ignore address in context Co-authored-by: Michael Simons <[email protected]>
1 parent 004f47a commit 034cb07

39 files changed

+325
-95
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,18 @@ public final Driver newInstance ( URI uri, AuthToken authToken, RoutingSettings
9393
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
9494

9595
MetricsProvider metricsProvider = createDriverMetrics( config, createClock() );
96-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config, ownsEventLoopGroup );
96+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metricsProvider, config,
97+
ownsEventLoopGroup, newRoutingSettings.routingContext() );
9798

9899
return createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metricsProvider, config );
99100
}
100101

101102
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
102-
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup )
103+
MetricsProvider metricsProvider, Config config, boolean ownsEventLoopGroup, RoutingContext routingContext )
103104
{
104105
Clock clock = createClock();
105106
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
106-
ChannelConnector connector = createConnector( settings, securityPlan, config, clock );
107+
ChannelConnector connector = createConnector( settings, securityPlan, config, clock, routingContext );
107108
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
108109
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
109110
config.idleTimeBeforeConnectionTest()
@@ -124,9 +125,9 @@ protected static MetricsProvider createDriverMetrics( Config config, Clock clock
124125
}
125126

126127
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,
127-
Config config, Clock clock )
128+
Config config, Clock clock, RoutingContext routingContext )
128129
{
129-
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock );
130+
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock, routingContext );
130131
}
131132

132133
private InternalDriver createDriver( URI uri, SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool,

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.neo4j.driver.internal.BoltServerAddress;
3131
import org.neo4j.driver.internal.ConnectionSettings;
3232
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
33+
import org.neo4j.driver.internal.cluster.RoutingContext;
3334
import org.neo4j.driver.internal.security.InternalAuthToken;
3435
import org.neo4j.driver.internal.security.SecurityPlan;
3536
import org.neo4j.driver.internal.util.Clock;
@@ -44,24 +45,26 @@
4445
public class ChannelConnectorImpl implements ChannelConnector
4546
{
4647
private final String userAgent;
47-
private final Map<String,Value> authToken;
48+
private final AuthToken authToken;
49+
private final RoutingContext routingContext;
4850
private final SecurityPlan securityPlan;
4951
private final ChannelPipelineBuilder pipelineBuilder;
5052
private final int connectTimeoutMillis;
5153
private final Logging logging;
5254
private final Clock clock;
5355

5456
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
55-
Clock clock )
57+
Clock clock, RoutingContext routingContext )
5658
{
57-
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock );
59+
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock, routingContext );
5860
}
5961

6062
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
61-
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock )
63+
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock, RoutingContext routingContext )
6264
{
6365
this.userAgent = connectionSettings.userAgent();
64-
this.authToken = tokenAsMap( connectionSettings.authToken() );
66+
this.authToken = requireValidAuthToken( connectionSettings.authToken() );
67+
this.routingContext = routingContext;
6568
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
6669
this.securityPlan = requireNonNull( securityPlan );
6770
this.pipelineBuilder = pipelineBuilder;
@@ -113,14 +116,14 @@ private void installHandshakeCompletedListeners( ChannelPromise handshakeComplet
113116

114117
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
115118
// set to send/receive messages for a selected protocol version
116-
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
119+
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, routingContext, connectionInitialized ) );
117120
}
118121

119-
private static Map<String,Value> tokenAsMap( AuthToken token )
122+
private static AuthToken requireValidAuthToken( AuthToken token )
120123
{
121124
if ( token instanceof InternalAuthToken )
122125
{
123-
return ((InternalAuthToken) token).toMap();
126+
return token;
124127
}
125128
else
126129
{

driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import java.util.Map;
2626

27+
import org.neo4j.driver.AuthToken;
28+
import org.neo4j.driver.internal.cluster.RoutingContext;
2729
import org.neo4j.driver.internal.messaging.BoltProtocol;
2830
import org.neo4j.driver.Value;
2931

@@ -32,14 +34,16 @@
3234
public class HandshakeCompletedListener implements ChannelFutureListener
3335
{
3436
private final String userAgent;
35-
private final Map<String,Value> authToken;
37+
private final AuthToken authToken;
38+
private final RoutingContext routingContext;
3639
private final ChannelPromise connectionInitializedPromise;
3740

38-
public HandshakeCompletedListener( String userAgent, Map<String,Value> authToken,
39-
ChannelPromise connectionInitializedPromise )
41+
public HandshakeCompletedListener( String userAgent, AuthToken authToken,
42+
RoutingContext routingContext, ChannelPromise connectionInitializedPromise )
4043
{
4144
this.userAgent = requireNonNull( userAgent );
4245
this.authToken = requireNonNull( authToken );
46+
this.routingContext = routingContext;
4347
this.connectionInitializedPromise = requireNonNull( connectionInitializedPromise );
4448
}
4549

@@ -49,7 +53,7 @@ public void operationComplete( ChannelFuture future )
4953
if ( future.isSuccess() )
5054
{
5155
BoltProtocol protocol = BoltProtocol.forChannel( future.channel() );
52-
protocol.initializeChannel( userAgent, authToken, connectionInitializedPromise );
56+
protocol.initializeChannel( userAgent, authToken, routingContext, connectionInitializedPromise );
5357
}
5458
else
5559
{

driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ BookmarkHolder bookmarkHolder( Bookmark bookmark )
5454
Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName )
5555
{
5656
HashMap<String,Value> map = new HashMap<>();
57-
map.put( ROUTING_CONTEXT, value( context.asMap() ) );
57+
map.put( ROUTING_CONTEXT, value( context.toMap() ) );
5858
map.put( DATABASE_NAME, value( (Object) databaseName.databaseName().orElse( null ) ) );
5959
return new Query( MULTI_DB_GET_ROUTING_TABLE, value( map ) );
6060
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingContext.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import org.neo4j.driver.internal.BoltServerAddress;
26+
import org.neo4j.driver.internal.Scheme;
2627

2728
import static java.util.Collections.emptyMap;
2829
import static java.util.Collections.unmodifiableMap;
@@ -33,14 +34,17 @@ public class RoutingContext
3334
private static final String ROUTING_ADDRESS_KEY = "address";
3435

3536
private final Map<String,String> context;
37+
private final boolean isServerRoutingEnabled;
3638

3739
private RoutingContext()
3840
{
41+
this.isServerRoutingEnabled = true;
3942
this.context = emptyMap();
4043
}
4144

4245
public RoutingContext( URI uri )
4346
{
47+
this.isServerRoutingEnabled = Scheme.isRoutingScheme( uri.getScheme() );
4448
this.context = unmodifiableMap( parseParameters( uri ) );
4549
}
4650

@@ -49,15 +53,20 @@ public boolean isDefined()
4953
return context.size() > 1;
5054
}
5155

52-
public Map<String,String> asMap()
56+
public Map<String,String> toMap()
5357
{
5458
return context;
5559
}
5660

61+
public boolean isServerRoutingEnabled()
62+
{
63+
return isServerRoutingEnabled;
64+
}
65+
5766
@Override
5867
public String toString()
5968
{
60-
return "RoutingContext" + context;
69+
return "RoutingContext" + context + " isServerRoutingEnabled=" + isServerRoutingEnabled;
6170
}
6271

6372
private static Map<String,String> parseParameters( URI uri )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName )
7676
"Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
7777
"Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) );
7878
}
79-
return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.asMap() ) );
79+
return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) );
8080
}
8181

8282
BookmarkHolder bookmarkHolder( Bookmark ignored )

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.concurrent.CompletionStage;
2626

27+
import org.neo4j.driver.AuthToken;
2728
import org.neo4j.driver.Bookmark;
2829
import org.neo4j.driver.Query;
2930
import org.neo4j.driver.Session;
@@ -34,6 +35,7 @@
3435
import org.neo4j.driver.internal.BookmarkHolder;
3536
import org.neo4j.driver.internal.InternalBookmark;
3637
import org.neo4j.driver.internal.async.UnmanagedTransaction;
38+
import org.neo4j.driver.internal.cluster.RoutingContext;
3739
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
3840
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
3941
import org.neo4j.driver.internal.messaging.v2.BoltProtocolV2;
@@ -58,9 +60,10 @@ public interface BoltProtocol
5860
*
5961
* @param userAgent the user agent string.
6062
* @param authToken the authentication token.
63+
* @param routingContext the configured routing context
6164
* @param channelInitializedPromise the promise to be notified when initialization is completed.
6265
*/
63-
void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise );
66+
void initializeChannel( String userAgent, AuthToken authToken, RoutingContext routingContext, ChannelPromise channelInitializedPromise );
6467

6568
/**
6669
* Prepare to close channel before it is closed.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/HelloMessage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class HelloMessage extends MessageWithMetadata
3232
public final static byte SIGNATURE = 0x01;
3333

3434
private static final String USER_AGENT_METADATA_KEY = "user_agent";
35+
private static final String ROUTING_CONTEXT_METADATA_KEY = "routing";
3536

36-
public HelloMessage( String userAgent, Map<String,Value> authToken )
37+
public HelloMessage( String userAgent, Map<String,Value> authToken, Map<String,String> routingContext )
3738
{
38-
super( buildMetadata( userAgent, authToken ) );
39+
super( buildMetadata( userAgent, authToken, routingContext ) );
3940
}
4041

4142
@Override
@@ -73,10 +74,11 @@ public String toString()
7374
return "HELLO " + metadataCopy;
7475
}
7576

76-
private static Map<String,Value> buildMetadata( String userAgent, Map<String,Value> authToken )
77+
private static Map<String,Value> buildMetadata( String userAgent, Map<String,Value> authToken, Map<String,String> routingContext )
7778
{
7879
Map<String,Value> result = new HashMap<>( authToken );
7980
result.put( USER_AGENT_METADATA_KEY, value( userAgent ) );
81+
result.put( ROUTING_CONTEXT_METADATA_KEY, value( routingContext ) );
8082
return result;
8183
}
8284
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletableFuture;
2727
import java.util.concurrent.CompletionStage;
2828

29+
import org.neo4j.driver.AuthToken;
2930
import org.neo4j.driver.Bookmark;
3031
import org.neo4j.driver.Query;
3132
import org.neo4j.driver.TransactionConfig;
@@ -35,6 +36,7 @@
3536
import org.neo4j.driver.internal.DatabaseName;
3637
import org.neo4j.driver.internal.InternalBookmark;
3738
import org.neo4j.driver.internal.async.UnmanagedTransaction;
39+
import org.neo4j.driver.internal.cluster.RoutingContext;
3840
import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
3941
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
4042
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -52,6 +54,7 @@
5254
import org.neo4j.driver.internal.messaging.request.InitMessage;
5355
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
5456
import org.neo4j.driver.internal.messaging.request.RunMessage;
57+
import org.neo4j.driver.internal.security.InternalAuthToken;
5558
import org.neo4j.driver.internal.spi.Connection;
5659
import org.neo4j.driver.internal.spi.ResponseHandler;
5760
import org.neo4j.driver.internal.util.Futures;
@@ -84,11 +87,12 @@ public MessageFormat createMessageFormat()
8487
}
8588

8689
@Override
87-
public void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise )
90+
public void initializeChannel( String userAgent, AuthToken authToken, RoutingContext routingContext,
91+
ChannelPromise channelInitializedPromise )
8892
{
8993
Channel channel = channelInitializedPromise.channel();
9094

91-
InitMessage message = new InitMessage( userAgent, authToken );
95+
InitMessage message = new InitMessage( userAgent, ((InternalAuthToken) authToken).toMap() );
9296
InitResponseHandler handler = new InitResponseHandler( channelInitializedPromise );
9397

9498
messageDispatcher( channel ).enqueue( handler );

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
2727

28+
import org.neo4j.driver.AuthToken;
2829
import org.neo4j.driver.Bookmark;
2930
import org.neo4j.driver.Query;
3031
import org.neo4j.driver.TransactionConfig;
3132
import org.neo4j.driver.Value;
3233
import org.neo4j.driver.internal.BookmarkHolder;
3334
import org.neo4j.driver.internal.DatabaseName;
34-
import org.neo4j.driver.internal.InternalBookmark;
3535
import org.neo4j.driver.internal.async.UnmanagedTransaction;
36+
import org.neo4j.driver.internal.cluster.RoutingContext;
3637
import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory;
3738
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
3839
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
@@ -49,6 +50,7 @@
4950
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
5051
import org.neo4j.driver.internal.messaging.request.HelloMessage;
5152
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
53+
import org.neo4j.driver.internal.security.InternalAuthToken;
5254
import org.neo4j.driver.internal.spi.Connection;
5355
import org.neo4j.driver.internal.util.Futures;
5456
import org.neo4j.driver.internal.util.MetadataExtractor;
@@ -76,11 +78,20 @@ public MessageFormat createMessageFormat()
7678
}
7779

7880
@Override
79-
public void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise )
81+
public void initializeChannel( String userAgent, AuthToken authToken, RoutingContext routingContext, ChannelPromise channelInitializedPromise )
8082
{
8183
Channel channel = channelInitializedPromise.channel();
84+
HelloMessage message;
85+
86+
if ( routingContext.isServerRoutingEnabled() )
87+
{
88+
message = new HelloMessage( userAgent, ( ( InternalAuthToken ) authToken ).toMap(), routingContext.toMap() );
89+
}
90+
else
91+
{
92+
message = new HelloMessage( userAgent, ( ( InternalAuthToken ) authToken ).toMap(), null );
93+
}
8294

83-
HelloMessage message = new HelloMessage( userAgent, authToken );
8495
HelloResponseHandler handler = new HelloResponseHandler( channelInitializedPromise, version() );
8596

8697
messageDispatcher( channel ).enqueue( handler );

driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.neo4j.driver.internal.async.connection.ChannelConnector;
4747
import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
4848
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
49+
import org.neo4j.driver.internal.cluster.RoutingContext;
4950
import org.neo4j.driver.internal.security.SecurityPlanImpl;
5051
import org.neo4j.driver.internal.security.SecurityPlan;
5152
import org.neo4j.driver.internal.util.FakeClock;
@@ -231,7 +232,7 @@ private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan sec
231232
int connectTimeoutMillis )
232233
{
233234
ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis );
234-
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() );
235+
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock(), RoutingContext.EMPTY );
235236
}
236237

237238
private static SecurityPlan trustAllCertificates() throws GeneralSecurityException

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.neo4j.driver.internal.async.connection.ChannelConnector;
5050
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
5151
import org.neo4j.driver.internal.async.pool.PoolSettings;
52+
import org.neo4j.driver.internal.cluster.RoutingContext;
5253
import org.neo4j.driver.internal.cluster.RoutingSettings;
5354
import org.neo4j.driver.internal.metrics.MetricsProvider;
5455
import org.neo4j.driver.internal.retry.RetrySettings;
@@ -445,14 +446,15 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
445446

446447
@Override
447448
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
448-
MetricsProvider ignored, Config config, boolean ownsEventLoopGroup )
449+
MetricsProvider ignored, Config config, boolean ownsEventLoopGroup,
450+
RoutingContext routingContext )
449451
{
450452
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
451453
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
452454
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
453455
config.idleTimeBeforeConnectionTest() );
454456
Clock clock = createClock();
455-
ChannelConnector connector = super.createConnector( connectionSettings, securityPlan, config, clock );
457+
ChannelConnector connector = super.createConnector( connectionSettings, securityPlan, config, clock, routingContext );
456458
connectionPool = new MemorizingConnectionPool( connector, bootstrap, poolSettings, config.logging(), clock, ownsEventLoopGroup );
457459
return connectionPool;
458460
}

0 commit comments

Comments
 (0)