Skip to content

Commit 9b5c42a

Browse files
committed
Simplified state in connection queue
Commit replaces two state variables with a single one and adds couple unit tests for state transitions.
1 parent 5c68377 commit 9b5c42a

File tree

2 files changed

+110
-12
lines changed

2 files changed

+110
-12
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.concurrent.BlockingQueue;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.LinkedBlockingQueue;
26-
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
2727

2828
import org.neo4j.driver.internal.logging.DelegatingLogger;
2929
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -40,12 +40,15 @@ public class BlockingPooledConnectionQueue
4040
{
4141
public static final String LOG_NAME = "ConnectionQueue";
4242

43+
private static final int ACTIVE = 1;
44+
private static final int INACTIVE = 2;
45+
private static final int TERMINATED = 3;
46+
4347
/** The backing queue, keeps track of connections currently in queue */
4448
private final BlockingQueue<PooledConnection> queue;
4549
private final Logger logger;
4650

47-
private final AtomicBoolean isDeactivated = new AtomicBoolean( false );
48-
private final AtomicBoolean isTerminating = new AtomicBoolean( false );
51+
private final AtomicInteger state = new AtomicInteger( ACTIVE );
4952

5053
/** Keeps track of acquired connections */
5154
private final Set<PooledConnection> acquiredConnections =
@@ -72,7 +75,7 @@ public boolean offer( PooledConnection pooledConnection )
7275
{
7376
disposeSafely( pooledConnection );
7477
}
75-
if ( isDeactivated.get() || isTerminating.get() )
78+
if ( state.get() != ACTIVE )
7679
{
7780
terminateIdleConnections();
7881
}
@@ -93,12 +96,13 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
9396
}
9497
acquiredConnections.add( connection );
9598

96-
if ( isDeactivated.get() || isTerminating.get() )
99+
int poolState = state.get();
100+
if ( poolState != ACTIVE )
97101
{
98102
acquiredConnections.remove( connection );
99103
disposeSafely( connection );
100-
throw new IllegalStateException( "Pool is " + (isDeactivated.get() ? "deactivated" : "terminated") + ", " +
101-
"new connections can't be acquired" );
104+
throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") +
105+
", new connections can't be acquired" );
102106
}
103107
else
104108
{
@@ -129,20 +133,20 @@ public boolean contains( PooledConnection pooledConnection )
129133

130134
public void activate()
131135
{
132-
isDeactivated.compareAndSet( true, false );
136+
state.compareAndSet( INACTIVE, ACTIVE );
133137
}
134138

135139
public void deactivate()
136140
{
137-
if ( isDeactivated.compareAndSet( false, true ) )
141+
if ( state.compareAndSet( ACTIVE, INACTIVE ) )
138142
{
139143
terminateIdleConnections();
140144
}
141145
}
142146

143147
public boolean isActive()
144148
{
145-
return !isDeactivated.get();
149+
return state.get() == ACTIVE;
146150
}
147151

148152
/**
@@ -153,7 +157,7 @@ public boolean isActive()
153157
*/
154158
public void terminate()
155159
{
156-
if ( isTerminating.compareAndSet( false, true ) )
160+
if ( state.getAndSet( TERMINATED ) != TERMINATED )
157161
{
158162
terminateIdleConnections();
159163
terminateAcquiredConnections();

driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.hamcrest.MatcherAssert.assertThat;
3333
import static org.junit.Assert.assertEquals;
3434
import static org.junit.Assert.assertFalse;
35+
import static org.junit.Assert.assertNotNull;
3536
import static org.junit.Assert.assertSame;
3637
import static org.junit.Assert.assertTrue;
3738
import static org.junit.Assert.fail;
@@ -333,11 +334,104 @@ public void shouldTerminateOfferedConnectionWhenDeactivated()
333334
}
334335

335336
@Test
336-
public void shouldReportWhenActive()
337+
public void shouldBeActiveWhenNotDeactivatedAndNotTerminated()
337338
{
338339
BlockingPooledConnectionQueue queue = newConnectionQueue( 1 );
339340
assertTrue( queue.isActive() );
341+
}
342+
343+
@Test
344+
public void shouldNotBeActiveWhenDeactivated()
345+
{
346+
BlockingPooledConnectionQueue queue = newConnectionQueue( 1 );
347+
assertTrue( queue.isActive() );
348+
queue.deactivate();
349+
assertFalse( queue.isActive() );
350+
}
351+
352+
@Test
353+
public void shouldNotBeActiveWhenTerminated()
354+
{
355+
BlockingPooledConnectionQueue queue = newConnectionQueue( 1 );
356+
assertTrue( queue.isActive() );
357+
queue.terminate();
358+
assertFalse( queue.isActive() );
359+
}
360+
361+
@Test
362+
public void shouldBeActiveAfterDeactivationAndActivation()
363+
{
364+
BlockingPooledConnectionQueue queue = newConnectionQueue( 1 );
365+
assertTrue( queue.isActive() );
366+
queue.deactivate();
367+
assertFalse( queue.isActive() );
368+
queue.activate();
369+
assertTrue( queue.isActive() );
370+
}
371+
372+
@Test
373+
public void shouldNotBeActiveAfterTerminationAndActivation()
374+
{
375+
BlockingPooledConnectionQueue queue = newConnectionQueue( 1 );
376+
assertTrue( queue.isActive() );
377+
queue.terminate();
378+
assertFalse( queue.isActive() );
379+
queue.activate();
380+
assertFalse( queue.isActive() );
381+
}
382+
383+
@Test
384+
public void shouldBePossibleToAcquireFromActivatedQueue()
385+
{
386+
Supplier<PooledConnection> connectionSupplier = connectionSupplierMock();
387+
when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) );
388+
BlockingPooledConnectionQueue queue = newConnectionQueue( 3 );
340389
queue.deactivate();
390+
391+
try
392+
{
393+
queue.acquire( connectionSupplier );
394+
fail( "Exception expected" );
395+
}
396+
catch ( IllegalStateException e )
397+
{
398+
assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) );
399+
}
400+
401+
queue.activate();
402+
403+
assertNotNull( queue.acquire( connectionSupplier ) );
404+
}
405+
406+
@Test
407+
public void shouldNotBePossibleToActivateTerminatedQueue()
408+
{
409+
Supplier<PooledConnection> connectionSupplier = connectionSupplierMock();
410+
when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) );
411+
BlockingPooledConnectionQueue queue = newConnectionQueue( 3 );
412+
queue.terminate();
413+
414+
try
415+
{
416+
queue.acquire( connectionSupplier );
417+
fail( "Exception expected" );
418+
}
419+
catch ( IllegalStateException e )
420+
{
421+
assertThat( e.getMessage(), startsWith( "Pool is terminated" ) );
422+
}
423+
424+
queue.activate();
425+
426+
try
427+
{
428+
queue.acquire( connectionSupplier );
429+
fail( "Exception expected" );
430+
}
431+
catch ( IllegalStateException e )
432+
{
433+
assertThat( e.getMessage(), startsWith( "Pool is terminated" ) );
434+
}
341435
assertFalse( queue.isActive() );
342436
}
343437

0 commit comments

Comments
 (0)