Skip to content

Commit d60da4c

Browse files
authored
Merge pull request #814 from injectives/feature/cherry-picking-4.3-stress-tests-improvements
cherry picking 4.3 stress tests improvements
2 parents 140e938 + 331ca64 commit d60da4c

15 files changed

+673
-156
lines changed

driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.lang.reflect.Method;
3232
import java.net.URI;
3333
import java.util.ArrayList;
34-
import java.util.Arrays;
34+
import java.util.Collections;
3535
import java.util.HashMap;
3636
import java.util.HashSet;
3737
import java.util.List;
@@ -159,7 +159,7 @@ void asyncApiBigDataTest() throws Throwable
159159
}
160160

161161
@Test
162-
void rxApiBigDataTest() throws Throwable
162+
void rxApiBigDataTest()
163163
{
164164
assertRxIsAvailable();
165165
Bookmark bookmark = createNodesRx( bigDataTestBatchCount(), BIG_DATA_TEST_BATCH_SIZE, driver );
@@ -221,7 +221,17 @@ Config config()
221221

222222
abstract C createContext();
223223

224-
abstract List<BlockingCommand<C>> createTestSpecificBlockingCommands();
224+
List<BlockingCommand<C>> createTestSpecificBlockingCommands() {
225+
return Collections.emptyList();
226+
}
227+
228+
List<AsyncCommand<C>> createTestSpecificAsyncCommands() {
229+
return Collections.emptyList();
230+
}
231+
232+
List<RxCommand<C>> createTestSpecificRxCommands() {
233+
return Collections.emptyList();
234+
}
225235

226236
abstract boolean handleWriteFailure( Throwable error, C context );
227237

@@ -245,23 +255,15 @@ private List<BlockingCommand<C>> createBlockingCommands()
245255
{
246256
List<BlockingCommand<C>> commands = new ArrayList<>();
247257

248-
commands.add( new BlockingReadQuery<>( driver, false ) );
249-
commands.add( new BlockingReadQuery<>( driver, true ) );
250-
251-
commands.add( new BlockingReadQueryInTx<>( driver, false ) );
252-
commands.add( new BlockingReadQueryInTx<>( driver, true ) );
258+
commands.add( new BlockingReadQueryWithRetries<>( driver, false ) );
259+
commands.add( new BlockingReadQueryWithRetries<>( driver, true ) );
253260

254-
commands.add( new BlockingWriteQuery<>( this, driver, false ) );
255-
commands.add( new BlockingWriteQuery<>( this, driver, true ) );
261+
commands.add( new BlockingWriteQueryWithRetries<>( this, driver, false ) );
262+
commands.add( new BlockingWriteQueryWithRetries<>( this, driver, true ) );
256263

257-
commands.add( new BlockingWriteQueryInTx<>( this, driver, false ) );
258-
commands.add( new BlockingWriteQueryInTx<>( this, driver, true ) );
264+
commands.add( new BlockingWrongQueryWithRetries<>( driver ) );
259265

260-
commands.add( new BlockingWrongQuery<>( driver ) );
261-
commands.add( new BlockingWrongQueryInTx<>( driver ) );
262-
263-
commands.add( new BlockingFailingQuery<>( driver ) );
264-
commands.add( new BlockingFailingQueryInTx<>( driver ) );
266+
commands.add( new BlockingFailingQueryWithRetries<>( driver ) );
265267

266268
commands.add( new FailedAuth<>( databaseUri(), config() ) );
267269

@@ -299,29 +301,19 @@ private List<Future<?>> launchRxWorkerThreads( C context )
299301

300302
private List<RxCommand<C>> createRxCommands()
301303
{
302-
return Arrays.asList(
303-
new RxReadQuery<>( driver, false ),
304-
new RxReadQuery<>( driver, true ),
305-
306-
new RxWriteQuery<>( this, driver, false ),
307-
new RxWriteQuery<>( this, driver, true ),
304+
List<RxCommand<C>> commands = new ArrayList<>();
308305

309-
new RxReadQueryInTx<>( driver, false ),
310-
new RxReadQueryInTx<>( driver, true ),
306+
commands.add( new RxReadQueryWithRetries<>( driver, false ) );
307+
commands.add( new RxReadQueryWithRetries<>( driver, true ) );
311308

312-
new RxWriteQueryInTx<>( this, driver, false ),
313-
new RxWriteQueryInTx<>( this, driver, true ),
309+
commands.add( new RxWriteQueryWithRetries<>( this, driver, false ) );
310+
commands.add( new RxWriteQueryWithRetries<>( this, driver, true ) );
314311

315-
new RxReadQueryWithRetries<>( driver, false ),
316-
new RxReadQueryWithRetries<>( driver, false ),
312+
commands.add( new RxFailingQueryWithRetries<>( driver ) );
317313

318-
new RxWriteQueryWithRetries<>( this, driver, false ),
319-
new RxWriteQueryWithRetries<>( this, driver, true ),
314+
commands.addAll( createTestSpecificRxCommands() );
320315

321-
new RxFailingQuery<>( driver ),
322-
new RxFailingQueryInTx<>( driver ),
323-
new RxFailingQueryWithRetries<>( driver )
324-
);
316+
return commands;
325317
}
326318

327319
private Future<Void> launchRxWorkerThread( ExecutorService executor, List<RxCommand<C>> commands, C context )
@@ -367,23 +359,15 @@ private List<AsyncCommand<C>> createAsyncCommands()
367359
{
368360
List<AsyncCommand<C>> commands = new ArrayList<>();
369361

370-
commands.add( new AsyncReadQuery<>( driver, false ) );
371-
commands.add( new AsyncReadQuery<>( driver, true ) );
372-
373-
commands.add( new AsyncReadQueryInTx<>( driver, false ) );
374-
commands.add( new AsyncReadQueryInTx<>( driver, true ) );
375-
376-
commands.add( new AsyncWriteQuery<>( this, driver, false ) );
377-
commands.add( new AsyncWriteQuery<>( this, driver, true ) );
362+
commands.add( new AsyncReadQueryWithRetries<>( driver, false ) );
363+
commands.add( new AsyncReadQueryWithRetries<>( driver, true ) );
378364

379-
commands.add( new AsyncWriteQueryInTx<>( this, driver, false ) );
380-
commands.add( new AsyncWriteQueryInTx<>( this, driver, true ) );
365+
commands.add( new AsyncWriteQueryWithRetries<>( this, driver, false ) );
366+
commands.add( new AsyncWriteQueryWithRetries<>( this, driver, true ) );
381367

382-
commands.add( new AsyncWrongQuery<>( driver ) );
383-
commands.add( new AsyncWrongQueryInTx<>( driver ) );
368+
commands.add( new AsyncWrongQueryWithRetries<>( driver ) );
384369

385-
commands.add( new AsyncFailingQuery<>( driver ) );
386-
commands.add( new AsyncFailingQueryInTx<>( driver ) );
370+
commands.add( new AsyncFailingQueryWithRetries<>( driver ) );
387371

388372
return commands;
389373
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.List;
22+
import java.util.concurrent.CompletionStage;
23+
24+
import org.neo4j.driver.AccessMode;
25+
import org.neo4j.driver.Driver;
26+
import org.neo4j.driver.Record;
27+
import org.neo4j.driver.async.AsyncSession;
28+
import org.neo4j.driver.async.ResultCursor;
29+
import org.neo4j.driver.internal.util.Futures;
30+
31+
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.junit.MatcherAssert.assertThat;
33+
import static org.junit.jupiter.api.Assertions.assertNull;
34+
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
35+
36+
public class AsyncFailingQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
37+
{
38+
public AsyncFailingQueryWithRetries( Driver driver )
39+
{
40+
super( driver, false );
41+
}
42+
43+
@Override
44+
public CompletionStage<Void> execute( C context )
45+
{
46+
AsyncSession session = newSession( AccessMode.READ, context );
47+
48+
CompletionStage<List<Record>> txStage = session.readTransactionAsync( tx -> tx.runAsync( "UNWIND [10, 5, 0] AS x RETURN 10 / x" )
49+
.thenCompose( ResultCursor::listAsync ) );
50+
51+
CompletionStage<Void> resultsProcessingStage = txStage
52+
.handle( ( records, error ) ->
53+
{
54+
assertNull( records );
55+
Throwable cause = Futures.completionExceptionCause( error );
56+
assertThat( cause, is( arithmeticError() ) );
57+
58+
return null;
59+
} );
60+
61+
return resultsProcessingStage.whenComplete( ( nothing, throwable ) -> session.closeAsync() );
62+
}
63+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.AccessMode;
24+
import org.neo4j.driver.Driver;
25+
import org.neo4j.driver.Record;
26+
import org.neo4j.driver.async.AsyncSession;
27+
import org.neo4j.driver.async.ResultCursor;
28+
import org.neo4j.driver.summary.ResultSummary;
29+
import org.neo4j.driver.types.Node;
30+
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
32+
33+
public class AsyncReadQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
34+
{
35+
public AsyncReadQueryWithRetries( Driver driver, boolean useBookmark )
36+
{
37+
super( driver, useBookmark );
38+
}
39+
40+
@Override
41+
public CompletionStage<Void> execute( C context )
42+
{
43+
AsyncSession session = newSession( AccessMode.READ, context );
44+
45+
CompletionStage<ResultSummary> txStage = session.readTransactionAsync(
46+
tx -> tx.runAsync( "MATCH (n) RETURN n LIMIT 1" )
47+
.thenCompose(
48+
cursor -> cursor.nextAsync()
49+
.thenCompose(
50+
record -> processRecordAndGetSummary( record, cursor ) ) ) );
51+
52+
CompletionStage<Void> resultsProcessingStage = txStage.thenApply( resultSummary -> processResultSummary( resultSummary, context ) );
53+
54+
return resultsProcessingStage.whenComplete( ( nothing, throwable ) -> session.closeAsync() );
55+
}
56+
57+
private CompletionStage<ResultSummary> processRecordAndGetSummary( Record record, ResultCursor cursor )
58+
{
59+
if ( record != null )
60+
{
61+
Node node = record.get( 0 ).asNode();
62+
assertNotNull( node );
63+
}
64+
return cursor.consumeAsync();
65+
}
66+
67+
private Void processResultSummary( ResultSummary resultSummary, C context )
68+
{
69+
if ( resultSummary != null )
70+
{
71+
context.readCompleted( resultSummary );
72+
}
73+
return null;
74+
}
75+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.AccessMode;
24+
import org.neo4j.driver.Driver;
25+
import org.neo4j.driver.async.AsyncSession;
26+
import org.neo4j.driver.async.ResultCursor;
27+
import org.neo4j.driver.summary.ResultSummary;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
31+
public class AsyncWriteQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
32+
{
33+
private final AbstractStressTestBase<C> stressTest;
34+
35+
public AsyncWriteQueryWithRetries( AbstractStressTestBase<C> stressTest, Driver driver, boolean useBookmark )
36+
{
37+
super( driver, useBookmark );
38+
this.stressTest = stressTest;
39+
}
40+
41+
@Override
42+
public CompletionStage<Void> execute( C context )
43+
{
44+
AsyncSession session = newSession( AccessMode.WRITE, context );
45+
46+
CompletionStage<ResultSummary> txStage = session.writeTransactionAsync(
47+
tx -> tx.runAsync( "CREATE ()" ).thenCompose( ResultCursor::consumeAsync ) );
48+
49+
return txStage.thenApply( resultSummary -> processResultSummary( resultSummary, context ) )
50+
.handle( ( nothing, throwable ) -> recordAndRethrowThrowable( throwable, context ) )
51+
.whenComplete( ( nothing, throwable ) -> finalizeSession( session, context ) );
52+
}
53+
54+
private Void processResultSummary( ResultSummary resultSummary, C context )
55+
{
56+
assertEquals( 1, resultSummary.counters().nodesCreated() );
57+
context.nodeCreated();
58+
return null;
59+
}
60+
61+
private Void recordAndRethrowThrowable( Throwable throwable, C context )
62+
{
63+
if ( throwable != null )
64+
{
65+
stressTest.handleWriteFailure( throwable, context );
66+
throw new RuntimeException( throwable );
67+
}
68+
return null;
69+
}
70+
71+
private void finalizeSession( AsyncSession session, C context )
72+
{
73+
context.setBookmark( session.lastBookmark() );
74+
session.closeAsync();
75+
}
76+
}

0 commit comments

Comments
 (0)