Skip to content

Commit cd90440

Browse files
author
Zhen Li
authored
Merge pull request #312 from lutovich/1.1-better-exception
Throw ServiceUnavailableException when socket write fails
2 parents db06302 + b46f48a commit cd90440

File tree

11 files changed

+90
-60
lines changed

11 files changed

+90
-60
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class RoutingTransaction implements Transaction
4747
private final BoltServerAddress address;
4848
private final RoutingErrorHandler onError;
4949

50-
RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
50+
public RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
5151
RoutingErrorHandler onError )
5252
{
5353
this.delegate = delegate;

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ private LoadBalancer(
5858
RoutingTable routingTable,
5959
ClusterCompositionProvider provider ) throws ServiceUnavailableException
6060
{
61-
this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) );
61+
this( routingTable, connections, new Rediscovery( settings, clock, log, provider ), log );
6262
}
6363

64-
LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery )
64+
LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
6565
throws ServiceUnavailableException
6666
{
6767
this.log = log;

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ interface Writer
2929
Writer write( Message msg ) throws IOException;
3030

3131
Writer flush() throws IOException;
32-
33-
Writer reset( WritableByteChannel channel );
3432
}
3533

3634
interface Reader

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -345,13 +345,6 @@ public Writer write( Message msg ) throws IOException
345345
return this;
346346
}
347347

348-
@Override
349-
public Writer reset( WritableByteChannel channel )
350-
{
351-
packer.reset( channel );
352-
return this;
353-
}
354-
355348
private void packNode( Node node ) throws IOException
356349
{
357350
packer.packStructHeader( NODE_FIELDS, NODE );

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.neo4j.driver.v1.Value;
3939
import org.neo4j.driver.v1.exceptions.ClientException;
4040
import org.neo4j.driver.v1.exceptions.Neo4jException;
41+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4142
import org.neo4j.driver.v1.summary.ServerInfo;
4243

4344
import static java.lang.String.format;
@@ -165,8 +166,7 @@ public synchronized void flush()
165166
}
166167
catch ( IOException e )
167168
{
168-
String message = e.getMessage();
169-
throw new ClientException( "Unable to send messages to server: " + message, e );
169+
throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e );
170170
}
171171
}
172172

driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.packstream;
2020

2121
import java.io.IOException;
22-
import java.nio.channels.WritableByteChannel;
2322
import java.nio.charset.Charset;
2423
import java.util.List;
2524
import java.util.Map;
@@ -158,16 +157,6 @@ public Packer( PackOutput out )
158157
this.out = out;
159158
}
160159

161-
public void reset( PackOutput out )
162-
{
163-
this.out = out;
164-
}
165-
166-
public void reset( WritableByteChannel channel )
167-
{
168-
((BufferedChannelOutput) out).reset( channel );
169-
}
170-
171160
public void flush() throws IOException
172161
{
173162
out.flush();

driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,26 @@
2121
import org.junit.Test;
2222
import org.mockito.InOrder;
2323

24-
import java.util.HashSet;
24+
import java.util.Collections;
25+
import java.util.Set;
2526
import java.util.concurrent.atomic.AtomicInteger;
2627

28+
import org.neo4j.driver.internal.RoutingTransaction;
2729
import org.neo4j.driver.internal.net.BoltServerAddress;
2830
import org.neo4j.driver.internal.spi.Connection;
2931
import org.neo4j.driver.internal.spi.ConnectionPool;
32+
import org.neo4j.driver.v1.AccessMode;
33+
import org.neo4j.driver.v1.Transaction;
34+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
35+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3036

31-
import static java.util.Arrays.asList;
3237
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static org.hamcrest.Matchers.instanceOf;
3339
import static org.hamcrest.core.IsEqual.equalTo;
40+
import static org.junit.Assert.assertNotNull;
41+
import static org.junit.Assert.fail;
3442
import static org.mockito.Matchers.any;
43+
import static org.mockito.Mockito.doThrow;
3544
import static org.mockito.Mockito.inOrder;
3645
import static org.mockito.Mockito.mock;
3746
import static org.mockito.Mockito.spy;
@@ -49,27 +58,28 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
4958
RoutingTable routingTable = mock( RoutingTable.class );
5059
Rediscovery rediscovery = mock( Rediscovery.class );
5160
when( routingTable.isStale() ).thenReturn( true );
52-
HashSet<BoltServerAddress> set = new HashSet<>( asList( new BoltServerAddress( "abc", 12 ) ) );
61+
Set<BoltServerAddress> set = Collections.singleton( new BoltServerAddress( "abc", 12 ) );
5362
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );
5463

5564
// when
56-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, conns, routingTable, rediscovery );
65+
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );
5766

5867
// then
68+
assertNotNull( balancer );
5969
InOrder inOrder = inOrder( rediscovery, routingTable, conns );
6070
inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable );
6171
inOrder.verify( routingTable ).update( any( ClusterComposition.class ) );
6272
inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) );
6373
}
6474

65-
6675
@Test
6776
public void shouldEnsureRoutingOnInitialization() throws Exception
6877
{
6978
// given & when
7079
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
71-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, mock( ConnectionPool.class ),
72-
mock( RoutingTable.class ), mock( Rediscovery.class ) ) {
80+
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
81+
mock( Rediscovery.class ), DEV_NULL_LOGGER )
82+
{
7383
@Override
7484
public void ensureRouting()
7585
{
@@ -78,6 +88,7 @@ public void ensureRouting()
7888
};
7989

8090
// then
91+
assertNotNull( balancer );
8192
assertThat( ensureRoutingCounter.get(), equalTo( 1 ) );
8293
}
8394

@@ -129,9 +140,36 @@ private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readCo
129140
when( routingTable.readers() ).thenReturn( readerAddrs );
130141
when( routingTable.writers() ).thenReturn( writerAddrs );
131142

132-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, connPool,
133-
routingTable, mock( Rediscovery.class ) ) ;
143+
return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
144+
}
134145

135-
return balancer;
146+
@Test
147+
public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable()
148+
{
149+
Transaction tx = mock( Transaction.class );
150+
RoutingTable routingTable = mock( RoutingTable.class );
151+
ConnectionPool connectionPool = mock( ConnectionPool.class );
152+
Rediscovery rediscovery = mock( Rediscovery.class );
153+
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
154+
BoltServerAddress address = new BoltServerAddress( "host", 42 );
155+
156+
RoutingTransaction routingTx = new RoutingTransaction( tx, AccessMode.WRITE, address, loadBalancer );
157+
158+
ServiceUnavailableException txCloseError = new ServiceUnavailableException( "Oh!" );
159+
doThrow( txCloseError ).when( tx ).close();
160+
161+
try
162+
{
163+
routingTx.close();
164+
fail( "Exception expected" );
165+
}
166+
catch ( Exception e )
167+
{
168+
assertThat( e, instanceOf( SessionExpiredException.class ) );
169+
assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) );
170+
}
171+
172+
verify( routingTable ).forget( address );
173+
verify( connectionPool ).purge( address );
136174
}
137175
}

driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import org.mockito.invocation.InvocationOnMock;
2323
import org.mockito.stubbing.Answer;
2424

25+
import java.io.IOException;
2526
import java.net.URI;
2627
import java.util.ArrayList;
2728
import java.util.Iterator;
29+
import java.util.Queue;
2830

2931
import org.neo4j.driver.internal.messaging.Message;
3032
import org.neo4j.driver.internal.messaging.SuccessMessage;
3133
import org.neo4j.driver.internal.summary.InternalServerInfo;
32-
import org.neo4j.driver.v1.Logger;
3334
import org.neo4j.driver.v1.Values;
35+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3436
import org.neo4j.driver.v1.summary.ServerInfo;
3537

3638
import static org.hamcrest.CoreMatchers.equalTo;
3739
import static org.hamcrest.CoreMatchers.instanceOf;
40+
import static org.junit.Assert.assertSame;
3841
import static org.junit.Assert.assertThat;
3942
import static org.junit.Assert.fail;
4043
import static org.mockito.Matchers.any;
@@ -45,16 +48,20 @@
4548
import static org.mockito.Mockito.times;
4649
import static org.mockito.Mockito.verify;
4750
import static org.mockito.Mockito.when;
51+
import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER;
52+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
4853
import static org.neo4j.driver.v1.Values.parameters;
4954

5055
public class SocketConnectionTest
5156
{
57+
private static final InternalServerInfo SERVER_INFO = new InternalServerInfo( LOCAL_DEFAULT, "test" );
58+
5259
@Test
5360
public void shouldReceiveServerInfoAfterInit() throws Throwable
5461
{
5562
// Given
5663
SocketClient socket = mock( SocketClient.class );
57-
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
64+
SocketConnection conn = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
5865

5966
when( socket.address() ).thenReturn( BoltServerAddress.from( URI.create( "http://neo4j.com:9000" ) ) );
6067

@@ -98,7 +105,7 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
98105
// Then
99106
try
100107
{
101-
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
108+
new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
102109
fail( "should have failed with the provided exception" );
103110
}
104111
catch( Throwable e )
@@ -108,4 +115,26 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
108115
}
109116
verify( socket, times( 1 ) ).stop();
110117
}
118+
119+
@Test
120+
@SuppressWarnings( "unchecked" )
121+
public void flushThrowsWhenSocketIsBroken() throws Exception
122+
{
123+
SocketClient socket = mock( SocketClient.class );
124+
IOException sendError = new IOException( "Unable to send" );
125+
doThrow( sendError ).when( socket ).send( any( Queue.class ) );
126+
127+
SocketConnection connection = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
128+
129+
try
130+
{
131+
connection.flush();
132+
fail( "Exception expected" );
133+
}
134+
catch ( Exception e )
135+
{
136+
assertThat( e, instanceOf( ServiceUnavailableException.class ) );
137+
assertSame( sendError, e.getCause() );
138+
}
139+
}
111140
}

driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java renamed to driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,8 @@ public BufferedChannelInput(ReadableByteChannel ch )
4040
public BufferedChannelInput( int bufferCapacity, ReadableByteChannel ch )
4141
{
4242
this.buffer = ByteBuffer.allocate( bufferCapacity ).order( ByteOrder.BIG_ENDIAN );
43-
reset( ch );
44-
}
45-
46-
public BufferedChannelInput reset( ReadableByteChannel ch )
47-
{
48-
this.channel = ch;
49-
this.buffer.position( 0 );
5043
this.buffer.limit( 0 );
51-
return this;
44+
this.channel = ch;
5245
}
5346

5447
@Override

driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java renamed to driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,7 @@
2626
public class BufferedChannelOutput implements PackOutput
2727
{
2828
private final ByteBuffer buffer;
29-
private WritableByteChannel channel;
30-
31-
public BufferedChannelOutput( int bufferSize )
32-
{
33-
this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN );
34-
}
29+
private final WritableByteChannel channel;
3530

3631
public BufferedChannelOutput( WritableByteChannel channel )
3732
{
@@ -40,12 +35,7 @@ public BufferedChannelOutput( WritableByteChannel channel )
4035

4136
public BufferedChannelOutput( WritableByteChannel channel, int bufferSize )
4237
{
43-
this( bufferSize );
44-
reset( channel );
45-
}
46-
47-
public void reset( WritableByteChannel channel )
48-
{
38+
this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN );
4939
this.channel = channel;
5040
}
5141

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ public void shouldHandleGracefulLeaderSwitch() throws Exception
312312
parameters( "name", "Webber", "title", "Mr" ) );
313313
tx1.success();
314314

315-
closeAndExpectException( tx1, ClientException.class );
316-
closeAndExpectException( session1, ClientException.class );
315+
closeAndExpectException( tx1, SessionExpiredException.class );
316+
session1.close();
317317

318318
String bookmark = inExpirableSession( driver, createSession(), new Function<Session,String>()
319319
{

0 commit comments

Comments
 (0)