Skip to content

Commit 0a4eb49

Browse files
authored
Merge pull request #529 from zhenlineo/1.7-goodbye
Added support of goodbye message before closing the driver.
2 parents 6bbb344 + 2e794ec commit 0a4eb49

File tree

16 files changed

+180
-18
lines changed

16 files changed

+180
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,9 @@ public class ConnectionPoolImpl implements ConnectionPool
6060
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
6161
private final AtomicBoolean closed = new AtomicBoolean();
6262

63-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
64-
MetricsListener metricsListener, Logging logging, Clock clock )
63+
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
6564
{
66-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, logging ), settings, metricsListener, logging, clock );
65+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock );
6766
}
6867

6968
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
@@ -153,11 +152,11 @@ public CompletionStage<Void> close()
153152
{
154153
try
155154
{
155+
nettyChannelTracker.prepareToCloseChannels();
156156
for ( Map.Entry<BoltServerAddress,ChannelPool> entry : pools.entrySet() )
157157
{
158158
BoltServerAddress address = entry.getKey();
159159
ChannelPool pool = entry.getValue();
160-
161160
log.info( "Closing connection pool towards %s", address );
162161
pool.close();
163162
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ public class NettyChannelPool extends FixedChannelPool
4545
private final ChannelConnector connector;
4646
private final NettyChannelTracker handler;
4747

48-
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap,
49-
NettyChannelTracker handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
50-
int maxConnections )
48+
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
49+
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
5150
{
5251
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
5352
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK );

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
23+
import io.netty.channel.group.ChannelGroup;
24+
import io.netty.channel.group.DefaultChannelGroup;
2325
import io.netty.channel.pool.ChannelPoolHandler;
26+
import io.netty.util.concurrent.EventExecutor;
2427

2528
import java.util.Map;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.concurrent.atomic.AtomicInteger;
2831

2932
import org.neo4j.driver.internal.BoltServerAddress;
33+
import org.neo4j.driver.internal.messaging.BoltProtocol;
3034
import org.neo4j.driver.internal.metrics.ListenerEvent;
3135
import org.neo4j.driver.internal.metrics.MetricsListener;
3236
import org.neo4j.driver.v1.Logger;
@@ -41,11 +45,18 @@ public class NettyChannelTracker implements ChannelPoolHandler
4145
private final Logger log;
4246
private final MetricsListener metricsListener;
4347
private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() );
48+
private final ChannelGroup allChannels;
4449

45-
public NettyChannelTracker( MetricsListener metricsListener, Logging logging )
50+
public NettyChannelTracker( MetricsListener metricsListener, EventExecutor eventExecutor, Logging logging )
51+
{
52+
this( metricsListener, new DefaultChannelGroup( "all-connections", eventExecutor ), logging );
53+
}
54+
55+
public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channels, Logging logging )
4656
{
4757
this.metricsListener = metricsListener;
4858
this.log = logging.getLog( getClass().getSimpleName() );
59+
this.allChannels = channels;
4960
}
5061

5162
@Override
@@ -77,6 +88,8 @@ public void channelCreated( Channel channel, ListenerEvent creatingEvent )
7788
log.debug( "Channel %s created", channel );
7889
incrementInUse( channel );
7990
metricsListener.afterCreated( serverAddress( channel ), creatingEvent );
91+
92+
allChannels.add( channel );
8093
}
8194

8295
public ListenerEvent channelCreating( BoltServerAddress address )
@@ -109,6 +122,24 @@ public int idleChannelCount( BoltServerAddress address )
109122
return count == null ? 0 : count.get();
110123
}
111124

125+
public void prepareToCloseChannels()
126+
{
127+
for ( Channel channel : allChannels )
128+
{
129+
BoltProtocol protocol = BoltProtocol.forChannel( channel );
130+
try
131+
{
132+
protocol.prepareToCloseChannel( channel );
133+
}
134+
catch ( Throwable e )
135+
{
136+
// only logging it
137+
log.debug( "Failed to prepare to close Channel %s due to error %s. " +
138+
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, e.getMessage() );
139+
}
140+
}
141+
}
142+
112143
private void incrementInUse( Channel channel )
113144
{
114145
increment( channel, addressToInUseChannelCount );

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ public interface BoltProtocol
5959
*/
6060
void initializeChannel( String userAgent, Map<String,Value> authToken, ChannelPromise channelInitializedPromise );
6161

62+
/**
63+
* Prepare to close channel before it is closed.
64+
* @param channel the channel to close.
65+
*/
66+
void prepareToCloseChannel( Channel channel );
67+
6268
/**
6369
* Begin an explicit transaction.
6470
*

driver/src/main/java/org/neo4j/driver/internal/messaging/request/GoodbyeMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class GoodbyeMessage implements Message
2424
{
2525
public final static byte SIGNATURE = 0x02;
2626

27-
public static final Message GOODBYE = new GoodbyeMessage();
27+
public static final GoodbyeMessage GOODBYE = new GoodbyeMessage();
2828

2929
private GoodbyeMessage()
3030
{

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8888
channel.writeAndFlush( message, channel.voidPromise() );
8989
}
9090

91+
@Override
92+
public void prepareToCloseChannel( Channel channel )
93+
{
94+
// left empty on purpose.
95+
}
96+
9197
@Override
9298
public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config )
9399
{

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.neo4j.driver.internal.messaging.Message;
4343
import org.neo4j.driver.internal.messaging.MessageFormat;
4444
import org.neo4j.driver.internal.messaging.request.BeginMessage;
45+
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
4546
import org.neo4j.driver.internal.messaging.request.HelloMessage;
4647
import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage;
4748
import org.neo4j.driver.internal.spi.Connection;
@@ -84,6 +85,14 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8485
channel.writeAndFlush( message, channel.voidPromise() );
8586
}
8687

88+
@Override
89+
public void prepareToCloseChannel( Channel channel )
90+
{
91+
GoodbyeMessage message = GoodbyeMessage.GOODBYE;
92+
messageDispatcher( channel ).enqueue( NoOpResponseHandler.INSTANCE );
93+
channel.writeAndFlush( message, channel.voidPromise() );
94+
}
95+
8796
@Override
8897
public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config )
8998
{

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltSer
220220
}
221221

222222
@Override
223-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
223+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
224+
MetricsListener metrics, Config config )
224225
{
225226
return connectionPool;
226227
}
@@ -255,7 +256,8 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
255256
}
256257

257258
@Override
258-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
259+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
260+
MetricsListener metrics, Config config )
259261
{
260262
return connectionPoolMock();
261263
}
@@ -277,7 +279,8 @@ protected Bootstrap createBootstrap()
277279
}
278280

279281
@Override
280-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
282+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
283+
MetricsListener metrics, Config config )
281284
{
282285
return connectionPoolMock();
283286
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.neo4j.driver.internal.security.InternalAuthToken;
4040
import org.neo4j.driver.internal.security.SecurityPlan;
4141
import org.neo4j.driver.internal.util.FakeClock;
42+
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
4243
import org.neo4j.driver.v1.AuthToken;
4344
import org.neo4j.driver.v1.AuthTokens;
4445
import org.neo4j.driver.v1.Value;
@@ -170,7 +171,7 @@ void shouldLimitNumberOfConcurrentConnections() throws Exception
170171
@Test
171172
void shouldTrackActiveChannels() throws Exception
172173
{
173-
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
174+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, new ImmediateSchedulingEventExecutor(), DEV_NULL_LOGGING );
174175

175176
poolHandler = tracker;
176177
pool = newPool( neo4j.authToken() );

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,33 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.embedded.EmbeddedChannel;
23+
import io.netty.channel.group.ChannelGroup;
24+
import org.bouncycastle.util.Arrays;
2325
import org.junit.jupiter.api.Test;
2426

2527
import org.neo4j.driver.internal.BoltServerAddress;
28+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
29+
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
30+
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
2631

32+
import static org.hamcrest.MatcherAssert.assertThat;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.hasItem;
2735
import static org.junit.jupiter.api.Assertions.assertEquals;
2836
import static org.junit.jupiter.api.Assertions.assertThrows;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.verify;
39+
import static org.mockito.Mockito.when;
40+
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
41+
import static org.neo4j.driver.internal.async.ChannelAttributes.setProtocolVersion;
2942
import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
3043
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3144
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
3245

3346
class NettyChannelTrackerTest
3447
{
3548
private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT;
36-
private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
49+
private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, mock( ChannelGroup.class ), DEV_NULL_LOGGING );
3750

3851
@Test
3952
void shouldIncrementInUseCountWhenChannelCreated()
@@ -164,10 +177,53 @@ void shouldReturnZeroActiveCountForUnknownAddress()
164177
assertEquals( 0, tracker.inUseChannelCount( address ) );
165178
}
166179

180+
@Test
181+
void shouldAddChannelToGroupWhenChannelCreated()
182+
{
183+
Channel channel = newChannel();
184+
Channel anotherChannel = newChannel();
185+
ChannelGroup group = mock( ChannelGroup.class );
186+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
187+
188+
tracker.channelCreated( channel, null );
189+
tracker.channelCreated( anotherChannel, null );
190+
191+
verify( group ).add( channel );
192+
verify( group ).add( anotherChannel );
193+
}
194+
195+
@Test
196+
void shouldDelegateToProtocolPrepareToClose()
197+
{
198+
EmbeddedChannel channel = newChannelWithProtocolV3();
199+
EmbeddedChannel anotherChannel = newChannelWithProtocolV3();
200+
ChannelGroup group = mock( ChannelGroup.class );
201+
when( group.iterator() ).thenReturn( new Arrays.Iterator<>( new Channel[]{channel, anotherChannel} ) );
202+
203+
NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, group, DEV_NULL_LOGGING );
204+
205+
tracker.prepareToCloseChannels();
206+
207+
assertThat( channel.outboundMessages().size(), equalTo( 1 ) );
208+
assertThat( channel.outboundMessages(), hasItem( GoodbyeMessage.GOODBYE ) );
209+
210+
assertThat( anotherChannel.outboundMessages().size(), equalTo( 1 ) );
211+
assertThat( anotherChannel.outboundMessages(), hasItem( GoodbyeMessage.GOODBYE ) );
212+
}
213+
167214
private Channel newChannel()
168215
{
169216
EmbeddedChannel channel = new EmbeddedChannel();
170217
setServerAddress( channel, address );
171218
return channel;
172219
}
220+
221+
private EmbeddedChannel newChannelWithProtocolV3()
222+
{
223+
EmbeddedChannel channel = new EmbeddedChannel();
224+
setServerAddress( channel, address );
225+
setProtocolVersion( channel, BoltProtocolV3.VERSION );
226+
setMessageDispatcher( channel, mock( InboundMessageDispatcher.class ) );
227+
return channel;
228+
}
173229
}

driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.neo4j.driver.internal.messaging.BoltProtocol;
4848
import org.neo4j.driver.internal.messaging.request.BeginMessage;
4949
import org.neo4j.driver.internal.messaging.request.CommitMessage;
50+
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
5051
import org.neo4j.driver.internal.messaging.request.HelloMessage;
5152
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
5253
import org.neo4j.driver.internal.messaging.request.RollbackMessage;
@@ -132,6 +133,16 @@ void shouldInitializeChannel()
132133
assertTrue( promise.isSuccess() );
133134
}
134135

136+
@Test
137+
void shouldPrepareToCloseChannel()
138+
{
139+
protocol.prepareToCloseChannel( channel );
140+
141+
assertThat( channel.outboundMessages(), hasSize( 1 ) );
142+
assertThat( channel.outboundMessages().poll(), instanceOf( GoodbyeMessage.class ) );
143+
assertEquals( 1, messageDispatcher.queuedHandlersCount() );
144+
}
145+
135146
@Test
136147
void shouldFailToInitializeChannelWhenErrorIsReceived()
137148
{

driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ protected final ChannelConnector createConnector( ConnectionSettings settings, S
7171
}
7272

7373
@Override
74-
protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics,
75-
Config config )
74+
protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
75+
MetricsListener metrics, Config config )
7676
{
7777
pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
7878
return pool;

driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public class FailingConnectionDriverFactory extends DriverFactory
4141
private final AtomicReference<Throwable> nextRunFailure = new AtomicReference<>();
4242

4343
@Override
44-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
44+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
45+
MetricsListener metrics, Config config )
4546
{
4647
ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
4748
return new ConnectionPoolWithFailingConnections( pool, nextRunFailure );

driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
316316
MemorizingConnectionPool connectionPool;
317317

318318
@Override
319-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
319+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
320+
MetricsListener metrics, Config config )
320321
{
321322
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
322323
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),

0 commit comments

Comments
 (0)