Skip to content

Commit 8ba0c41

Browse files
author
Zhen Li
committed
Fix after review
1 parent 8a0fed5 commit 8ba0c41

File tree

14 files changed

+28
-59
lines changed

14 files changed

+28
-59
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8686
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
8787

8888
InternalAbstractMetrics metrics = createDriverMetrics( config );
89-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, eventExecutorGroup, metrics, config );
89+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
9090

9191
InternalDriver driver = createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metrics, config );
9292

@@ -95,8 +95,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
9595
return driver;
9696
}
9797

98-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, EventExecutorGroup eventExecutorGroup,
99-
MetricsListener metrics, Config config )
98+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
10099
{
101100
Clock clock = createClock();
102101
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
@@ -105,7 +104,7 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan
105104
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
106105
config.idleTimeBeforeConnectionTest()
107106
);
108-
return new ConnectionPoolImpl( connector, bootstrap, eventExecutorGroup, poolSettings, metrics, config.logging(), clock );
107+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metrics, config.logging(), clock );
109108
}
110109

111110
protected static InternalAbstractMetrics createDriverMetrics( Config config )

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.EventLoopGroup;
2424
import io.netty.channel.pool.ChannelPool;
25-
import io.netty.util.concurrent.EventExecutorGroup;
2625
import io.netty.util.concurrent.Future;
2726

2827
import java.util.Map;
@@ -61,10 +60,9 @@ public class ConnectionPoolImpl implements ConnectionPool
6160
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
6261
private final AtomicBoolean closed = new AtomicBoolean();
6362

64-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, EventExecutorGroup eventExecutorGroup, PoolSettings settings,
65-
MetricsListener metricsListener, Logging logging, Clock clock )
63+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
6664
{
67-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, eventExecutorGroup, logging ), settings, metricsListener, logging, clock );
65+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock );
6866
}
6967

7068
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
@@ -154,13 +152,12 @@ public CompletionStage<Void> close()
154152
{
155153
try
156154
{
155+
nettyChannelTracker.prepareToCloseChannels();
157156
for ( Map.Entry<BoltServerAddress,ChannelPool> entry : pools.entrySet() )
158157
{
159158
BoltServerAddress address = entry.getKey();
160159
ChannelPool pool = entry.getValue();
161-
162160
log.info( "Closing connection pool towards %s", address );
163-
nettyChannelTracker.destructAllChannels();
164161
pool.close();
165162
}
166163

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ public void channelClosed( Channel channel )
108108
{
109109
decrementIdle( channel );
110110
metricsListener.afterClosed( serverAddress( channel ) );
111-
112-
allChannels.remove( channel );
113111
}
114112

115113
public int inUseChannelCount( BoltServerAddress address )
@@ -124,14 +122,14 @@ public int idleChannelCount( BoltServerAddress address )
124122
return count == null ? 0 : count.get();
125123
}
126124

127-
public void destructAllChannels()
125+
public void prepareToCloseChannels()
128126
{
129127
for ( Channel channel : allChannels )
130128
{
131129
BoltProtocol protocol = BoltProtocol.forChannel( channel );
132130
try
133131
{
134-
protocol.destructChannel( channel );
132+
protocol.prepareToCloseChannel( channel );
135133
}
136134
catch ( Throwable e )
137135
{

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ public interface BoltProtocol
6060
void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise );
6161

6262
/**
63-
* Destruct channel before it is destroyed.
64-
* @param channel the channel to destroy.
63+
* Prepare to close channel before it is closed.
64+
* @param channel the channel to close.
6565
*/
66-
void destructChannel( Channel channel );
66+
void prepareToCloseChannel( Channel channel );
6767

6868
/**
6969
* Begin an explicit transaction.

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8989
}
9090

9191
@Override
92-
public void destructChannel( Channel channel )
92+
public void prepareToCloseChannel( Channel channel )
9393
{
9494
// left empty on purpose.
9595
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8686
}
8787

8888
@Override
89-
public void destructChannel( Channel channel )
89+
public void prepareToCloseChannel( Channel channel )
9090
{
9191
GoodbyeMessage message = GoodbyeMessage.GOODBYE;
9292
messageDispatcher( channel ).enqueue( NoOpResponseHandler.INSTANCE );

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltSer
221221

222222
@Override
223223
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
224-
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
224+
MetricsListener metrics, Config config )
225225
{
226226
return connectionPool;
227227
}
@@ -257,7 +257,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
257257

258258
@Override
259259
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
260-
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
260+
MetricsListener metrics, Config config )
261261
{
262262
return connectionPoolMock();
263263
}
@@ -280,7 +280,7 @@ protected Bootstrap createBootstrap()
280280

281281
@Override
282282
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
283-
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
283+
MetricsListener metrics, Config config )
284284
{
285285
return connectionPoolMock();
286286
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.pool.ChannelPool;
24-
import io.netty.util.concurrent.EventExecutorGroup;
2524
import io.netty.util.concurrent.ImmediateEventExecutor;
2625
import org.junit.jupiter.api.AfterEach;
2726
import org.junit.jupiter.api.BeforeEach;
@@ -206,7 +205,7 @@ private ConnectionPoolImpl newPool() throws Exception
206205
DEV_NULL_LOGGING, clock );
207206
PoolSettings poolSettings = newSettings();
208207
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
209-
return new ConnectionPoolImpl( connector, bootstrap, mock( EventExecutorGroup.class ), poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
208+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
210209
}
211210

212211
private static PoolSettings newSettings()

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -193,27 +193,7 @@ void shouldAddChannelToGroupWhenChannelCreated()
193193
}
194194

195195
@Test
196-
void shouldRemoveChannelFromGroupWhenChannelClosed()
197-
{
198-
Channel channel = newChannel();
199-
Channel anotherChannel = newChannel();
200-
ChannelGroup group = mock( ChannelGroup.class );
201-
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
202-
203-
tracker.channelCreated( channel, null );
204-
tracker.channelReleased( channel );
205-
tracker.channelCreated( anotherChannel, null );
206-
tracker.channelReleased( anotherChannel );
207-
208-
tracker.channelClosed( channel );
209-
tracker.channelClosed( anotherChannel );
210-
211-
verify( group ).remove( channel );
212-
verify( group ).remove( anotherChannel );
213-
}
214-
215-
@Test
216-
void shouldDelegateToProtocolDestruct()
196+
void shouldDelegateToProtocolPrepareToClose()
217197
{
218198
EmbeddedChannel channel = newChannelWithProtocolV3();
219199
EmbeddedChannel anotherChannel = newChannelWithProtocolV3();
@@ -222,7 +202,7 @@ void shouldDelegateToProtocolDestruct()
222202

223203
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
224204

225-
tracker.destructAllChannels();
205+
tracker.prepareToCloseChannels();
226206

227207
assertThat( channel.outboundMessages().size(), equalTo( 1 ) );
228208
assertThat( channel.outboundMessages(), hasItem( GoodbyeMessage.GOODBYE ) );

driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ void shouldInitializeChannel()
134134
}
135135

136136
@Test
137-
void shouldDestructChannel()
137+
void shouldPrepareToCloseChannel()
138138
{
139-
protocol.destructChannel( channel );
139+
protocol.prepareToCloseChannel( channel );
140140

141141
assertThat( channel.outboundMessages(), hasSize( 1 ) );
142142
assertThat( channel.outboundMessages().poll(), instanceOf( GoodbyeMessage.class ) );

driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
23-
import io.netty.util.concurrent.EventExecutorGroup;
2423

2524
import java.util.ArrayList;
2625
import java.util.List;
@@ -73,9 +72,9 @@ protected final ChannelConnector createConnector( ConnectionSettings settings, S
7372

7473
@Override
7574
protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
76-
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
75+
MetricsListener metrics, Config config )
7776
{
78-
pool = super.createConnectionPool( authToken, securityPlan, bootstrap, eventExecutorGroup, metrics, config );
77+
pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
7978
return pool;
8079
}
8180

driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.util;
2020

2121
import io.netty.bootstrap.Bootstrap;
22-
import io.netty.util.concurrent.EventExecutorGroup;
2322

2423
import java.util.Set;
2524
import java.util.concurrent.CompletionStage;
@@ -42,10 +41,10 @@ public class FailingConnectionDriverFactory extends DriverFactory
4241
private final AtomicReference<Throwable> nextRunFailure = new AtomicReference<>();
4342

4443
@Override
45-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, EventExecutorGroup eventExecutorGroup,
44+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
4645
MetricsListener metrics, Config config )
4746
{
48-
ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, eventExecutorGroup, metrics, config );
47+
ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
4948
return new ConnectionPoolWithFailingConnections( pool, nextRunFailure );
5049
}
5150

driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
23-
import io.netty.util.concurrent.EventExecutorGroup;
2423
import org.junit.jupiter.api.AfterEach;
2524
import org.junit.jupiter.api.BeforeEach;
2625
import org.junit.jupiter.api.Test;
@@ -46,7 +45,6 @@
4645
import org.neo4j.driver.internal.spi.ConnectionPool;
4746
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
4847
import org.neo4j.driver.internal.util.Clock;
49-
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
5048
import org.neo4j.driver.v1.AuthToken;
5149
import org.neo4j.driver.v1.AuthTokens;
5250
import org.neo4j.driver.v1.Config;
@@ -319,7 +317,7 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
319317

320318
@Override
321319
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
322-
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
320+
MetricsListener metrics, Config config )
323321
{
324322
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
325323
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
@@ -341,7 +339,7 @@ private static class MemorizingConnectionPool extends ConnectionPoolImpl
341339
MemorizingConnectionPool( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
342340
Logging logging, Clock clock )
343341
{
344-
super( connector, bootstrap, new ImmediateSchedulingEventExecutor(), settings, DEV_NULL_METRICS, logging, clock );
342+
super( connector, bootstrap, settings, DEV_NULL_METRICS, logging, clock );
345343
}
346344

347345

driver/src/test/resources/goodbye_message.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ S: SUCCESS {"fields": ["x"]}
99
RECORD [1]
1010
SUCCESS {"bookmark": "bookmark:1"}
1111
C: GOODBYE
12-
S: SUCCESS {}
12+
S: <EXIT>

0 commit comments

Comments
 (0)