Skip to content

Commit afdd61b

Browse files
author
Zhen Li
committed
Added support of goodbye message before closing the driver.
Used ChannelGroup to keep track of all alive channels.
1 parent 9b49898 commit afdd61b

16 files changed

+177
-23
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8686
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
8787

8888
InternalAbstractMetrics metrics = createDriverMetrics( config );
89-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
89+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, eventExecutorGroup, metrics, config );
9090

9191
InternalDriver driver = createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metrics, config );
9292

@@ -95,7 +95,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
9595
return driver;
9696
}
9797

98-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
98+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, EventExecutorGroup eventExecutorGroup,
99+
MetricsListener metrics, Config config )
99100
{
100101
Clock clock = createClock();
101102
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
@@ -104,7 +105,7 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan
104105
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
105106
config.idleTimeBeforeConnectionTest()
106107
);
107-
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metrics, config.logging(), clock );
108+
return new ConnectionPoolImpl( connector, bootstrap, eventExecutorGroup, poolSettings, metrics, config.logging(), clock );
108109
}
109110

110111
protected static InternalAbstractMetrics createDriverMetrics( Config config )

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.EventLoopGroup;
2424
import io.netty.channel.pool.ChannelPool;
25+
import io.netty.util.concurrent.EventExecutorGroup;
2526
import io.netty.util.concurrent.Future;
2627

2728
import java.util.Map;
@@ -60,10 +61,10 @@ public class ConnectionPoolImpl implements ConnectionPool
6061
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
6162
private final AtomicBoolean closed = new AtomicBoolean();
6263

63-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
64+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, EventExecutorGroup eventExecutorGroup, PoolSettings settings,
6465
MetricsListener metricsListener, Logging logging, Clock clock )
6566
{
66-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, logging ), settings, metricsListener, logging, clock );
67+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, eventExecutorGroup, logging ), settings, metricsListener, logging, clock );
6768
}
6869

6970
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
@@ -159,6 +160,7 @@ public CompletionStage<Void> close()
159160
ChannelPool pool = entry.getValue();
160161

161162
log.info( "Closing connection pool towards %s", address );
163+
nettyChannelTracker.destructAllChannels();
162164
pool.close();
163165
}
164166

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ public class NettyChannelPool extends FixedChannelPool
4545
private final ChannelConnector connector;
4646
private final NettyChannelTracker handler;
4747

48-
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap,
49-
NettyChannelTracker handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
50-
int maxConnections )
48+
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
49+
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
5150
{
5251
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
5352
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK );

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
23+
import io.netty.channel.group.ChannelGroup;
24+
import io.netty.channel.group.DefaultChannelGroup;
2325
import io.netty.channel.pool.ChannelPoolHandler;
26+
import io.netty.util.concurrent.EventExecutorGroup;
2427

2528
import java.util.Map;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.concurrent.atomic.AtomicInteger;
2831

2932
import org.neo4j.driver.internal.BoltServerAddress;
33+
import org.neo4j.driver.internal.messaging.BoltProtocol;
3034
import org.neo4j.driver.internal.metrics.ListenerEvent;
3135
import org.neo4j.driver.internal.metrics.MetricsListener;
3236
import org.neo4j.driver.v1.Logger;
@@ -41,11 +45,18 @@ public class NettyChannelTracker implements ChannelPoolHandler
4145
private final Logger log;
4246
private final MetricsListener metricsListener;
4347
private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() );
48+
private final ChannelGroup allChannels;
4449

45-
public NettyChannelTracker( MetricsListener metricsListener, Logging logging )
50+
public NettyChannelTracker( MetricsListener metricsListener, EventExecutorGroup eventExecutorGroup, Logging logging )
51+
{
52+
this( metricsListener, new DefaultChannelGroup( "all-connections", eventExecutorGroup.next() ), logging );
53+
}
54+
55+
public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channels, Logging logging )
4656
{
4757
this.metricsListener = metricsListener;
4858
this.log = logging.getLog( getClass().getSimpleName() );
59+
this.allChannels = channels;
4960
}
5061

5162
@Override
@@ -77,6 +88,8 @@ public void channelCreated( Channel channel, ListenerEvent creatingEvent )
7788
log.debug( "Channel %s created", channel );
7889
incrementInUse( channel );
7990
metricsListener.afterCreated( serverAddress( channel ), creatingEvent );
91+
92+
allChannels.add( channel );
8093
}
8194

8295
public ListenerEvent channelCreating( BoltServerAddress address )
@@ -95,6 +108,8 @@ public void channelClosed( Channel channel )
95108
{
96109
decrementIdle( channel );
97110
metricsListener.afterClosed( serverAddress( channel ) );
111+
112+
allChannels.remove( channel );
98113
}
99114

100115
public int inUseChannelCount( BoltServerAddress address )
@@ -109,6 +124,24 @@ public int idleChannelCount( BoltServerAddress address )
109124
return count == null ? 0 : count.get();
110125
}
111126

127+
public void destructAllChannels()
128+
{
129+
for ( Channel channel : allChannels )
130+
{
131+
BoltProtocol protocol = BoltProtocol.forChannel( channel );
132+
try
133+
{
134+
protocol.destructChannel( channel );
135+
}
136+
catch ( Throwable e )
137+
{
138+
// only logging it
139+
log.debug( "Failed to destruct Channel %s due to error %s. " +
140+
"It is safe to ignore this error as the channel will be closed despite if it is successfully destructed.", channel, e.getMessage() );
141+
}
142+
}
143+
}
144+
112145
private void incrementInUse( Channel channel )
113146
{
114147
increment( channel, addressToInUseChannelCount );

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ public interface BoltProtocol
5959
*/
6060
void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise );
6161

62+
/**
63+
* Destruct channel before it is destroyed.
64+
* @param channel the channel to destroy.
65+
*/
66+
void destructChannel( Channel channel );
67+
6268
/**
6369
* Begin an explicit transaction.
6470
*

driver/src/main/java/org/neo4j/driver/internal/messaging/request/GoodbyeMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class GoodbyeMessage implements Message
2424
{
2525
public final static byte SIGNATURE = 0x02;
2626

27-
public static final Message GOODBYE = new GoodbyeMessage();
27+
public static final GoodbyeMessage GOODBYE = new GoodbyeMessage();
2828

2929
private GoodbyeMessage()
3030
{

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8888
channel.writeAndFlush( message, channel.voidPromise() );
8989
}
9090

91+
@Override
92+
public void destructChannel( Channel channel )
93+
{
94+
// left empty on purpose.
95+
}
96+
9197
@Override
9298
public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config )
9399
{

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.neo4j.driver.internal.messaging.Message;
4343
import org.neo4j.driver.internal.messaging.MessageFormat;
4444
import org.neo4j.driver.internal.messaging.request.BeginMessage;
45+
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
4546
import org.neo4j.driver.internal.messaging.request.HelloMessage;
4647
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
4748
import org.neo4j.driver.internal.spi.Connection;
@@ -84,6 +85,14 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8485
channel.writeAndFlush( message, channel.voidPromise() );
8586
}
8687

88+
@Override
89+
public void destructChannel( Channel channel )
90+
{
91+
GoodbyeMessage message = GoodbyeMessage.GOODBYE;
92+
messageDispatcher( channel ).enqueue( NoOpResponseHandler.INSTANCE );
93+
channel.writeAndFlush( message, channel.voidPromise() );
94+
}
95+
8796
@Override
8897
public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config )
8998
{

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltSer
220220
}
221221

222222
@Override
223-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
223+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
224+
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
224225
{
225226
return connectionPool;
226227
}
@@ -255,7 +256,8 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
255256
}
256257

257258
@Override
258-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
259+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
260+
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
259261
{
260262
return connectionPoolMock();
261263
}
@@ -277,7 +279,8 @@ protected Bootstrap createBootstrap()
277279
}
278280

279281
@Override
280-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
282+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
283+
EventExecutorGroup eventExecutorGroup, MetricsListener metrics, Config config )
281284
{
282285
return connectionPoolMock();
283286
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.pool.ChannelPool;
24+
import io.netty.util.concurrent.EventExecutorGroup;
2425
import io.netty.util.concurrent.ImmediateEventExecutor;
2526
import org.junit.jupiter.api.AfterEach;
2627
import org.junit.jupiter.api.BeforeEach;
@@ -205,7 +206,7 @@ private ConnectionPoolImpl newPool() throws Exception
205206
DEV_NULL_LOGGING, clock );
206207
PoolSettings poolSettings = newSettings();
207208
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
208-
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
209+
return new ConnectionPoolImpl( connector, bootstrap, mock( EventExecutorGroup.class ), poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
209210
}
210211

211212
private static PoolSettings newSettings()

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.neo4j.driver.internal.security.InternalAuthToken;
4040
import org.neo4j.driver.internal.security.SecurityPlan;
4141
import org.neo4j.driver.internal.util.FakeClock;
42+
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
4243
import org.neo4j.driver.v1.AuthToken;
4344
import org.neo4j.driver.v1.AuthTokens;
4445
import org.neo4j.driver.v1.Value;
@@ -170,7 +171,7 @@ void shouldLimitNumberOfConcurrentConnections() throws Exception
170171
@Test
171172
void shouldTrackActiveChannels() throws Exception
172173
{
173-
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
174+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, new ImmediateSchedulingEventExecutor(), DEV_NULL_LOGGING );
174175

175176
poolHandler = tracker;
176177
pool = newPool( neo4j.authToken() );

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,33 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.embedded.EmbeddedChannel;
23+
import io.netty.channel.group.ChannelGroup;
24+
import org.bouncycastle.util.Arrays;
2325
import org.junit.jupiter.api.Test;
2426

2527
import org.neo4j.driver.internal.BoltServerAddress;
28+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
29+
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
30+
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
2631

32+
import static org.hamcrest.MatcherAssert.assertThat;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.hasItem;
2735
import static org.junit.jupiter.api.Assertions.assertEquals;
2836
import static org.junit.jupiter.api.Assertions.assertThrows;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.verify;
39+
import static org.mockito.Mockito.when;
40+
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
41+
import static org.neo4j.driver.internal.async.ChannelAttributes.setProtocolVersion;
2942
import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
3043
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3144
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
3245

3346
class NettyChannelTrackerTest
3447
{
3548
private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT;
36-
private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
49+
private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, mock( ChannelGroup.class ), DEV_NULL_LOGGING );
3750

3851
@Test
3952
void shouldIncrementInUseCountWhenChannelCreated()
@@ -164,10 +177,73 @@ void shouldReturnZeroActiveCountForUnknownAddress()
164177
assertEquals( 0, tracker.inUseChannelCount( address ) );
165178
}
166179

180+
@Test
181+
void shouldAddChannelToGroupWhenChannelCreated()
182+
{
183+
Channel channel = newChannel();
184+
Channel anotherChannel = newChannel();
185+
ChannelGroup group = mock( ChannelGroup.class );
186+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
187+
188+
tracker.channelCreated( channel, null );
189+
tracker.channelCreated( anotherChannel, null );
190+
191+
verify( group ).add( channel );
192+
verify( group ).add( anotherChannel );
193+
}
194+
195+
@Test
196+
void shouldRemoveChannelFromGroupWhenChannelClosed()
197+
{
198+
Channel channel = newChannel();
199+
Channel anotherChannel = newChannel();
200+
ChannelGroup group = mock( ChannelGroup.class );
201+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
202+
203+
tracker.channelCreated( channel, null );
204+
tracker.channelReleased( channel );
205+
tracker.channelCreated( anotherChannel, null );
206+
tracker.channelReleased( anotherChannel );
207+
208+
tracker.channelClosed( channel );
209+
tracker.channelClosed( anotherChannel );
210+
211+
verify( group ).remove( channel );
212+
verify( group ).remove( anotherChannel );
213+
}
214+
215+
@Test
216+
void shouldDelegateToProtocolDestruct()
217+
{
218+
EmbeddedChannel channel = newChannelWithProtocolV3();
219+
EmbeddedChannel anotherChannel = newChannelWithProtocolV3();
220+
ChannelGroup group = mock( ChannelGroup.class );
221+
when( group.iterator() ).thenReturn( new Arrays.Iterator<>( new Channel[]{channel, anotherChannel} ) );
222+
223+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
224+
225+
tracker.destructAllChannels();
226+
227+
assertThat( channel.outboundMessages().size(), equalTo( 1 ) );
228+
assertThat( channel.outboundMessages(), hasItem( GoodbyeMessage.GOODBYE ) );
229+
230+
assertThat( anotherChannel.outboundMessages().size(), equalTo( 1 ) );
231+
assertThat( anotherChannel.outboundMessages(), hasItem( GoodbyeMessage.GOODBYE ) );
232+
}
233+
167234
private Channel newChannel()
168235
{
169236
EmbeddedChannel channel = new EmbeddedChannel();
170237
setServerAddress( channel, address );
171238
return channel;
172239
}
240+
241+
private EmbeddedChannel newChannelWithProtocolV3()
242+
{
243+
EmbeddedChannel channel = new EmbeddedChannel();
244+
setServerAddress( channel, address );
245+
setProtocolVersion( channel, BoltProtocolV3.VERSION );
246+
setMessageDispatcher( channel, mock( InboundMessageDispatcher.class ) );
247+
return channel;
248+
}
173249
}

0 commit comments

Comments
 (0)