Skip to content

Causal clustering stress test improvements #321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthToken;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void setUp() throws Exception
{
URI clusterUri = clusterRule.getClusterUri();
AuthToken authToken = clusterRule.getAuthToken();
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig();
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withMaxIdleSessions( THREAD_COUNT ).toConfig();
driver = GraphDatabase.driver( clusterUri, authToken, config );

ThreadFactory threadFactory = new DaemonThreadFactory( getClass().getSimpleName() + "-worker-" );
Expand All @@ -109,11 +109,11 @@ public void tearDown() throws Exception
@Test
public void basicStressTest() throws Throwable
{
AtomicBoolean stop = new AtomicBoolean();
List<Future<?>> resultFutures = launchWorkerThreads( stop );
Context context = new Context();
List<Future<?>> resultFutures = launchWorkerThreads( context );

long openFileDescriptors = sleepAndGetOpenFileDescriptorCount();
stop.set( true );
context.stop();

Throwable firstError = null;
for ( Future<?> future : resultFutures )
Expand All @@ -134,16 +134,17 @@ public void basicStressTest() throws Throwable
}

assertNoFileDescriptorLeak( openFileDescriptors );
assertExpectedNumberOfNodesCreated( context.getCreatedNodesCount() );
}

private List<Future<?>> launchWorkerThreads( AtomicBoolean stop )
private List<Future<?>> launchWorkerThreads( Context context )
{
List<Command> commands = createCommands();
List<Future<?>> futures = new ArrayList<>();

for ( int i = 0; i < THREAD_COUNT; i++ )
{
Future<Void> future = launchWorkerThread( executor, commands, stop );
Future<Void> future = launchWorkerThread( executor, commands, context );
futures.add( future );
}

Expand All @@ -155,9 +156,11 @@ private List<Command> createCommands()
List<Command> commands = new ArrayList<>();

commands.add( new ReadQuery( driver ) );
commands.add( new ReadQueryInTx( driver ) );
commands.add( new ReadQueryInTx( driver, false ) );
commands.add( new ReadQueryInTx( driver, true ) );
commands.add( new WriteQuery( driver ) );
commands.add( new WriteQueryInTx( driver ) );
commands.add( new WriteQueryInTx( driver, false ) );
commands.add( new WriteQueryInTx( driver, true ) );
commands.add( new WriteQueryUsingReadSession( driver ) );
commands.add( new WriteQueryUsingReadSessionInTx( driver ) );
commands.add( new FailedAuth( clusterRule.getClusterUri() ) );
Expand All @@ -166,18 +169,20 @@ private List<Command> createCommands()
}

private static Future<Void> launchWorkerThread( final ExecutorService executor, final List<Command> commands,
final AtomicBoolean stop )
final Context context )
{
return executor.submit( new Callable<Void>()
{
final ThreadLocalRandom random = ThreadLocalRandom.current();

@Override
public Void call() throws Exception
{
while ( !stop.get() )
while ( !context.isStopped() )
{
int randomCommandIdx = ThreadLocalRandom.current().nextInt( commands.size() );
int randomCommandIdx = random.nextInt( commands.size() );
Command command = commands.get( randomCommandIdx );
command.execute();
command.execute( context );
}
return null;
}
Expand All @@ -202,6 +207,18 @@ private void assertNoFileDescriptorLeak( long previousOpenFileDescriptors )
currentOpenFileDescriptorCount, lessThanOrEqualTo( maxOpenFileDescriptors ) );
}

private void assertExpectedNumberOfNodesCreated( long expectedCount )
{
try ( Session session = driver.session() )
{
List<Record> records = session.run( "MATCH (n) RETURN count(n) AS nodesCount" ).list();
assertEquals( 1, records.size() );
Record record = records.get( 0 );
long actualCount = record.get( "nodesCount" ).asLong();
assertEquals( "Unexpected number of nodes in the database", expectedCount, actualCount );
}
}

private static long getOpenFileDescriptorCount()
{
try
Expand All @@ -227,9 +244,46 @@ private static Throwable withSuppressed( Throwable firstError, Throwable newErro
return firstError;
}

private static class Context
{
volatile boolean stopped;
volatile String bookmark;
final AtomicLong createdNodesCount = new AtomicLong();

boolean isStopped()
{
return stopped;
}

void stop()
{
this.stopped = true;
}

String getBookmark()
{
return bookmark;
}

void setBookmark( String bookmark )
{
this.bookmark = bookmark;
}

void nodeCreated()
{
createdNodesCount.incrementAndGet();
}

long getCreatedNodesCount()
{
return createdNodesCount.get();
}
}

private interface Command
{
void execute();
void execute( Context context );
}

private static abstract class BaseQuery implements Command
Expand All @@ -240,6 +294,19 @@ private static abstract class BaseQuery implements Command
{
this.driver = driver;
}

Transaction beginTx( Session session, Context context, boolean useBookmark )
{
if ( useBookmark )
{
String bookmark = context.getBookmark();
if ( bookmark != null )
{
return session.beginTransaction( bookmark );
}
}
return session.beginTransaction();
}
}

private static class ReadQuery extends BaseQuery
Expand All @@ -250,7 +317,7 @@ private static class ReadQuery extends BaseQuery
}

@Override
public void execute()
public void execute( Context context )
{
try ( Session session = driver.session( AccessMode.READ ) )
{
Expand All @@ -268,16 +335,19 @@ public void execute()

private static class ReadQueryInTx extends BaseQuery
{
ReadQueryInTx( Driver driver )
final boolean useBookmark;

ReadQueryInTx( Driver driver, boolean useBookmark )
{
super( driver );
this.useBookmark = useBookmark;
}

@Override
public void execute()
public void execute( Context context )
{
try ( Session session = driver.session( AccessMode.READ );
Transaction tx = session.beginTransaction() )
Transaction tx = beginTx( session, context, useBookmark ) )
{
StatementResult result = tx.run( "MATCH (n) RETURN n LIMIT 1" );
List<Record> records = result.list();
Expand All @@ -287,6 +357,7 @@ public void execute()
Node node = record.get( 0 ).asNode();
assertNotNull( node );
}
tx.success();
}
}
}
Expand All @@ -299,34 +370,44 @@ private static class WriteQuery extends BaseQuery
}

@Override
public void execute()
public void execute( Context context )
{
StatementResult result;
try ( Session session = driver.session( AccessMode.WRITE ) )
{
result = session.run( "CREATE ()" );
}
assertEquals( 1, result.summary().counters().nodesCreated() );
context.nodeCreated();
}
}

private static class WriteQueryInTx extends BaseQuery
{
WriteQueryInTx( Driver driver )
final boolean useBookmark;

WriteQueryInTx( Driver driver, boolean useBookmark )
{
super( driver );
this.useBookmark = useBookmark;
}

@Override
public void execute()
public void execute( Context context )
{
StatementResult result;
try ( Session session = driver.session( AccessMode.WRITE );
Transaction tx = session.beginTransaction() )
try ( Session session = driver.session( AccessMode.WRITE ) )
{
result = tx.run( "CREATE ()" );
try ( Transaction tx = beginTx( session, context, useBookmark ) )
{
result = tx.run( "CREATE ()" );
tx.success();
}

context.setBookmark( session.lastBookmark() );
}
assertEquals( 1, result.summary().counters().nodesCreated() );
context.nodeCreated();
}
}

Expand All @@ -338,7 +419,7 @@ private static class WriteQueryUsingReadSession extends BaseQuery
}

@Override
public void execute()
public void execute( Context context )
{
StatementResult result = null;
try
Expand Down Expand Up @@ -366,7 +447,7 @@ private static class WriteQueryUsingReadSessionInTx extends BaseQuery
}

@Override
public void execute()
public void execute( Context context )
{
StatementResult result = null;
try
Expand All @@ -375,6 +456,7 @@ public void execute()
Transaction tx = session.beginTransaction() )
{
result = tx.run( "CREATE ()" );
tx.success();
}
fail( "Exception expected" );
}
Expand All @@ -397,7 +479,7 @@ private static class FailedAuth implements Command
}

@Override
public void execute()
public void execute( Context context )
{
Logger logger = mock( Logger.class );
Logging logging = mock( Logging.class );
Expand Down