Skip to content

Commit e891ed9

Browse files
authored
Fixed SSL handling (#851)
This update fixes a number of SSL-related tests in testkit and CausalClusteringIT.shouldDropBrokenOldConnections test. The connection pooling strategy has been updated to use the same connection pool when the connection host is unambiguous. Removed hardcoded domain name resolution from the BoltServerAddress and moved the logic to ChannelConnectorImpl that uses the DomainNameResolver.
1 parent b98b697 commit e891ed9

File tree

10 files changed

+197
-83
lines changed

10 files changed

+197
-83
lines changed

driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.net.InetAddress;
22-
import java.net.InetSocketAddress;
23-
import java.net.SocketAddress;
2421
import java.net.URI;
25-
import java.util.Collections;
26-
import java.util.LinkedHashSet;
2722
import java.util.Objects;
28-
import java.util.Set;
23+
import java.util.stream.Stream;
2924

3025
import org.neo4j.driver.net.ServerAddress;
3126

@@ -39,11 +34,10 @@ public class BoltServerAddress implements ServerAddress
3934
public static final int DEFAULT_PORT = 7687;
4035
public static final BoltServerAddress LOCAL_DEFAULT = new BoltServerAddress( "localhost", DEFAULT_PORT );
4136

42-
private final String host; // This could either be the same as originalHost or it is an IP address resolved from the original host.
43-
private final int port;
37+
protected final String host; // Host or IP address.
38+
private final String connectionHost; // Either is equal to the host or is explicitly provided on creation and is expected to be a resolved IP address.
39+
protected final int port;
4440
private final String stringValue;
45-
46-
private final Set<BoltServerAddress> resolved;
4741

4842
public BoltServerAddress( String address )
4943
{
@@ -57,15 +51,17 @@ public BoltServerAddress( URI uri )
5751

5852
public BoltServerAddress( String host, int port )
5953
{
60-
this( host, port, Collections.emptySet() );
54+
this( host, host, port );
6155
}
6256

63-
public BoltServerAddress( String host, int port, Set<BoltServerAddress> resolved )
57+
public BoltServerAddress( String host, String connectionHost, int port )
6458
{
6559
this.host = requireNonNull( host, "host" );
60+
this.connectionHost = requireNonNull( connectionHost, "connectionHost" );
6661
this.port = requireValidPort( port );
67-
this.stringValue = String.format( "%s:%d", host, port );
68-
this.resolved = Collections.unmodifiableSet( new LinkedHashSet<>( resolved ) );
62+
this.stringValue = host.equals( connectionHost )
63+
? String.format( "%s:%d", host, port )
64+
: String.format( "%s(%s):%d", host, connectionHost, port );
6965
}
7066

7167
public static BoltServerAddress from( ServerAddress address )
@@ -86,14 +82,14 @@ public boolean equals( Object o )
8682
{
8783
return false;
8884
}
89-
BoltServerAddress that = (BoltServerAddress) o;
90-
return port == that.port && host.equals( that.host );
85+
BoltServerAddress address = (BoltServerAddress) o;
86+
return port == address.port && host.equals( address.host ) && connectionHost.equals( address.connectionHost );
9187
}
9288

9389
@Override
9490
public int hashCode()
9591
{
96-
return Objects.hash( host, port );
92+
return Objects.hash( host, connectionHost, port );
9793
}
9894

9995
@Override
@@ -102,18 +98,6 @@ public String toString()
10298
return stringValue;
10399
}
104100

105-
/**
106-
* Create a {@link SocketAddress} from this bolt address. This method always attempts to resolve the hostname into
107-
* an {@link InetAddress}.
108-
*
109-
* @return new socket address.
110-
* @see InetSocketAddress
111-
*/
112-
public SocketAddress toSocketAddress()
113-
{
114-
return new InetSocketAddress( host, port );
115-
}
116-
117101
@Override
118102
public String host()
119103
{
@@ -126,9 +110,21 @@ public int port()
126110
return port;
127111
}
128112

129-
public Set<BoltServerAddress> resolved()
113+
public String connectionHost()
114+
{
115+
return connectionHost;
116+
}
117+
118+
/**
119+
* Create a stream of unicast addresses.
120+
* <p>
121+
* While this implementation just returns a stream of itself, the subclasses may provide multiple addresses.
122+
*
123+
* @return stream of unicast addresses.
124+
*/
125+
public Stream<BoltServerAddress> unicastStream()
130126
{
131-
return this.resolved;
127+
return Stream.of( this );
132128
}
133129

134130
private static String hostFrom( URI uri )
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import java.net.InetAddress;
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.LinkedHashSet;
25+
import java.util.Objects;
26+
import java.util.Set;
27+
import java.util.stream.Stream;
28+
29+
import static java.util.Objects.requireNonNull;
30+
import static java.util.stream.Collectors.joining;
31+
32+
/**
33+
* An explicitly resolved version of {@link BoltServerAddress} that always contains one or more resolved IP addresses.
34+
*/
35+
public class ResolvedBoltServerAddress extends BoltServerAddress
36+
{
37+
private static final String HOST_ADDRESSES_FORMAT = "%s%s:%d";
38+
private static final int MAX_HOST_ADDRESSES_IN_STRING_VALUE = 5;
39+
private static final String HOST_ADDRESS_DELIMITER = ",";
40+
private static final String HOST_ADDRESSES_PREFIX = "(";
41+
private static final String HOST_ADDRESSES_SUFFIX = ")";
42+
private static final String TRIMMED_HOST_ADDRESSES_SUFFIX = ",..." + HOST_ADDRESSES_SUFFIX;
43+
44+
private final Set<InetAddress> resolvedAddresses;
45+
private final String stringValue;
46+
47+
public ResolvedBoltServerAddress( String host, int port, InetAddress[] resolvedAddressesArr )
48+
{
49+
super( host, port );
50+
requireNonNull( resolvedAddressesArr, "resolvedAddressesArr" );
51+
if ( resolvedAddressesArr.length == 0 )
52+
{
53+
throw new IllegalArgumentException(
54+
"The resolvedAddressesArr must not be empty, check your DomainNameResolver is compliant with the interface contract" );
55+
}
56+
resolvedAddresses = Collections.unmodifiableSet( new LinkedHashSet<>( Arrays.asList( resolvedAddressesArr ) ) );
57+
stringValue = createStringRepresentation();
58+
}
59+
60+
/**
61+
* Create a stream of unicast addresses.
62+
* <p>
63+
* The stream is created from the list of resolved IP addresses. Each unicast address is given a unique IP address as the connectionHost value.
64+
*
65+
* @return stream of unicast addresses.
66+
*/
67+
@Override
68+
public Stream<BoltServerAddress> unicastStream()
69+
{
70+
return resolvedAddresses.stream().map( address -> new BoltServerAddress( host, address.getHostAddress(), port ) );
71+
}
72+
73+
@Override
74+
public boolean equals( Object o )
75+
{
76+
if ( this == o )
77+
{
78+
return true;
79+
}
80+
if ( o == null || getClass() != o.getClass() )
81+
{
82+
return false;
83+
}
84+
if ( !super.equals( o ) )
85+
{
86+
return false;
87+
}
88+
ResolvedBoltServerAddress that = (ResolvedBoltServerAddress) o;
89+
return resolvedAddresses.equals( that.resolvedAddresses );
90+
}
91+
92+
@Override
93+
public int hashCode()
94+
{
95+
return Objects.hash( super.hashCode(), resolvedAddresses );
96+
}
97+
98+
@Override
99+
public String toString()
100+
{
101+
return stringValue;
102+
}
103+
104+
private String createStringRepresentation()
105+
{
106+
String hostAddresses = resolvedAddresses.stream()
107+
.limit( MAX_HOST_ADDRESSES_IN_STRING_VALUE )
108+
.map( InetAddress::getHostAddress )
109+
.collect( joining( HOST_ADDRESS_DELIMITER, HOST_ADDRESSES_PREFIX,
110+
resolvedAddresses.size() > MAX_HOST_ADDRESSES_IN_STRING_VALUE
111+
? TRIMMED_HOST_ADDRESSES_SUFFIX
112+
: HOST_ADDRESSES_SUFFIX ) );
113+
return String.format( HOST_ADDRESSES_FORMAT, host, hostAddresses, port );
114+
}
115+
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.netty.resolver.AddressResolverGroup;
2828

2929
import java.net.InetSocketAddress;
30+
import java.net.SocketAddress;
3031

3132
import org.neo4j.driver.AuthToken;
3233
import org.neo4j.driver.AuthTokens;
@@ -53,6 +54,7 @@ public class ChannelConnectorImpl implements ChannelConnector
5354
private final int connectTimeoutMillis;
5455
private final Logging logging;
5556
private final Clock clock;
57+
private final DomainNameResolver domainNameResolver;
5658
private final AddressResolverGroup<InetSocketAddress> addressResolverGroup;
5759

5860
public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
@@ -73,7 +75,8 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan
7375
this.pipelineBuilder = pipelineBuilder;
7476
this.logging = requireNonNull( logging );
7577
this.clock = requireNonNull( clock );
76-
this.addressResolverGroup = new NettyDomainNameResolverGroup( requireNonNull( domainNameResolver ) );
78+
this.domainNameResolver = requireNonNull( domainNameResolver );
79+
this.addressResolverGroup = new NettyDomainNameResolverGroup( this.domainNameResolver );
7780
}
7881

7982
@Override
@@ -83,7 +86,17 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
8386
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) );
8487
bootstrap.resolver( addressResolverGroup );
8588

86-
ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
89+
SocketAddress socketAddress;
90+
try
91+
{
92+
socketAddress = new InetSocketAddress( domainNameResolver.resolve( address.connectionHost() )[0], address.port() );
93+
}
94+
catch ( Throwable t )
95+
{
96+
socketAddress = InetSocketAddress.createUnresolved( address.connectionHost(), address.port() );
97+
}
98+
99+
ChannelFuture channelConnected = bootstrap.connect( socketAddress );
87100

88101
Channel channel = channelConnected.channel();
89102
ChannelPromise handshakeCompleted = channel.newPromise();

driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,15 @@
2121
import io.netty.util.concurrent.EventExecutorGroup;
2222

2323
import java.net.UnknownHostException;
24-
import java.util.Arrays;
2524
import java.util.Collection;
2625
import java.util.HashSet;
27-
import java.util.LinkedHashSet;
2826
import java.util.LinkedList;
2927
import java.util.List;
3028
import java.util.Set;
3129
import java.util.concurrent.CompletableFuture;
3230
import java.util.concurrent.CompletionException;
3331
import java.util.concurrent.CompletionStage;
3432
import java.util.concurrent.TimeUnit;
35-
import java.util.stream.Collectors;
3633

3734
import org.neo4j.driver.Bookmark;
3835
import org.neo4j.driver.Logger;
@@ -42,6 +39,7 @@
4239
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4340
import org.neo4j.driver.internal.BoltServerAddress;
4441
import org.neo4j.driver.internal.DomainNameResolver;
42+
import org.neo4j.driver.internal.ResolvedBoltServerAddress;
4543
import org.neo4j.driver.internal.spi.ConnectionPool;
4644
import org.neo4j.driver.internal.util.Futures;
4745
import org.neo4j.driver.net.ServerAddress;
@@ -308,7 +306,7 @@ public List<BoltServerAddress> resolve() throws UnknownHostException
308306
{
309307
try
310308
{
311-
resolvedAddresses.addAll( resolveAllByDomainName( BoltServerAddress.from( serverAddress ) ) );
309+
resolveAllByDomainName( serverAddress ).unicastStream().forEach( resolvedAddresses::add );
312310
}
313311
catch ( UnknownHostException e )
314312
{
@@ -345,21 +343,22 @@ private BoltServerAddress resolveByDomainNameOrThrowCompletionException( BoltSer
345343
{
346344
try
347345
{
348-
Set<BoltServerAddress> resolvedAddresses = resolveAllByDomainName( address );
349-
routingTable.replaceRouterIfPresent( address, new BoltServerAddress( address.host(), address.port(), resolvedAddresses ) );
350-
return resolvedAddresses.stream().findFirst().orElseThrow(
351-
() -> new IllegalStateException( "Domain name resolution returned empty result set and has not thrown an exception" ) );
346+
ResolvedBoltServerAddress resolvedAddress = resolveAllByDomainName( address );
347+
routingTable.replaceRouterIfPresent( address, resolvedAddress );
348+
return resolvedAddress.unicastStream()
349+
.findFirst()
350+
.orElseThrow(
351+
() -> new IllegalStateException(
352+
"Unexpected condition, the ResolvedBoltServerAddress must always have at least one unicast address" ) );
352353
}
353354
catch ( Throwable e )
354355
{
355356
throw new CompletionException( e );
356357
}
357358
}
358359

359-
private Set<BoltServerAddress> resolveAllByDomainName( BoltServerAddress address ) throws UnknownHostException
360+
private ResolvedBoltServerAddress resolveAllByDomainName( ServerAddress address ) throws UnknownHostException
360361
{
361-
return Arrays.stream( domainNameResolver.resolve( address.host() ) )
362-
.map( inetAddress -> new BoltServerAddress( inetAddress.getHostAddress(), address.port() ) )
363-
.collect( Collectors.toCollection( LinkedHashSet::new ) );
362+
return new ResolvedBoltServerAddress( address.host(), address.port(), domainNameResolver.resolve( address.host() ) );
364363
}
365364
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,18 @@ else if ( routingTable.isStaleFor( context.mode() ) )
108108
}
109109
}
110110

111-
private synchronized void freshClusterCompositionFetched( ClusterCompositionLookupResult composition )
111+
private synchronized void freshClusterCompositionFetched( ClusterCompositionLookupResult compositionLookupResult )
112112
{
113113
try
114114
{
115-
routingTable.update( composition.getClusterComposition() );
115+
routingTable.update( compositionLookupResult.getClusterComposition() );
116116
routingTableRegistry.removeAged();
117117

118118
Set<BoltServerAddress> addressesToRetain = new LinkedHashSet<>();
119-
for ( BoltServerAddress address : routingTableRegistry.allServers() )
120-
{
121-
addressesToRetain.add( address );
122-
addressesToRetain.addAll( address.resolved() );
123-
}
124-
composition.getResolvedInitialRouters().ifPresent(
119+
routingTableRegistry.allServers().stream()
120+
.flatMap( BoltServerAddress::unicastStream )
121+
.forEach( addressesToRetain::add );
122+
compositionLookupResult.getResolvedInitialRouters().ifPresent(
125123
addresses ->
126124
{
127125
resolvedInitialRouters.clear();

0 commit comments

Comments
 (0)