Skip to content

Commit a01e1dc

Browse files
committed
Unify Async & Sync CommandBatchCursor testing
Part of a wider Command Cursor refactoring JAVA-5159
1 parent 87af5d3 commit a01e1dc

17 files changed

+1675
-1480
lines changed

config/codenarc/codenarc.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,6 @@
3434
<exclude name="ComparisonWithSelf"/>
3535
</ruleset-ref>
3636
<ruleset-ref path='rulesets/braces.xml'/>
37-
<ruleset-ref path='rulesets/concurrency.xml'>
38-
<rule-config name='BusyWait'>
39-
<property name='doNotApplyToFileNames' value='AsyncCommandBatchCursorFunctionalSpecification.groovy'/>
40-
</rule-config>
41-
42-
</ruleset-ref>
4337
<ruleset-ref path='rulesets/convention.xml'>
4438
<rule-config name='NoDef'>
4539
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.mongodb.ServerCursor;
2626
import com.mongodb.annotations.ThreadSafe;
2727
import com.mongodb.connection.ServerType;
28+
import com.mongodb.internal.VisibleForTesting;
2829
import com.mongodb.internal.binding.ConnectionSource;
2930
import com.mongodb.internal.connection.Connection;
3031
import com.mongodb.internal.diagnostics.logging.Logger;
@@ -54,6 +55,7 @@
5455
import static com.mongodb.assertions.Assertions.fail;
5556
import static com.mongodb.assertions.Assertions.notNull;
5657
import static com.mongodb.internal.Locks.withLock;
58+
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
5759
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
5860
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
5961
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_ITERATOR;
@@ -69,18 +71,19 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
6971

7072
private final MongoNamespace namespace;
7173
private final int limit;
72-
private final Decoder<T> decoder;
7374
private final long maxTimeMS;
74-
private int batchSize;
75+
private final Decoder<T> decoder;
7576
@Nullable
7677
private final BsonValue comment;
78+
private final int maxWireVersion;
79+
private final boolean firstBatchEmpty;
80+
private final ResourceManager resourceManager;
81+
82+
private int batchSize;
7783
private CommandCursorResult<T> commandCursorResult;
84+
private int count;
7885
@Nullable
7986
private List<T> nextBatch;
80-
private int count;
81-
private final boolean firstBatchEmpty;
82-
private final int maxWireVersion;
83-
private final ResourceManager resourceManager;
8487

8588
CommandBatchCursor(
8689
final BsonDocument commandCursorDocument,
@@ -162,6 +165,11 @@ private List<T> doNext() {
162165
return retVal;
163166
}
164167

168+
@VisibleForTesting(otherwise = PRIVATE)
169+
boolean isClosed() {
170+
return !resourceManager.operable();
171+
}
172+
165173
@Override
166174
public void setBatchSize(final int batchSize) {
167175
this.batchSize = batchSize;

driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,6 @@ class OperationFunctionalSpecification extends Specification {
207207
}
208208
}
209209

210-
def consumeAsyncResults(cursor) {
211-
def batch = next(cursor, true)
212-
while (batch != null) {
213-
batch = next(cursor, true)
214-
}
215-
}
216-
217210
void testOperation(Map params) {
218211
params.async = params.async != null ? params.async : false
219212
params.result = params.result != null ? params.result : null

driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
import com.mongodb.ClusterFixture;
2020
import com.mongodb.MongoNamespace;
21+
import com.mongodb.async.FutureResultCallback;
2122
import com.mongodb.client.test.CollectionHelper;
2223
import com.mongodb.internal.connection.ServerHelper;
24+
import com.mongodb.internal.validator.NoOpFieldNameValidator;
2325
import com.mongodb.lang.Nullable;
2426
import org.bson.BsonArray;
2527
import org.bson.BsonDocument;
2628
import org.bson.BsonDouble;
2729
import org.bson.BsonValue;
2830
import org.bson.Document;
31+
import org.bson.FieldNameValidator;
2932
import org.bson.codecs.BsonDocumentCodec;
3033
import org.bson.codecs.DecoderContext;
3134
import org.bson.codecs.DocumentCodec;
@@ -39,8 +42,11 @@
3942
import java.util.Collections;
4043
import java.util.List;
4144
import java.util.Map;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.function.Consumer;
4247
import java.util.stream.Collectors;
4348

49+
import static com.mongodb.ClusterFixture.TIMEOUT;
4450
import static com.mongodb.ClusterFixture.checkReferenceCountReachesTarget;
4551
import static com.mongodb.ClusterFixture.getAsyncBinding;
4652
import static com.mongodb.ClusterFixture.getBinding;
@@ -50,14 +56,17 @@
5056
import static com.mongodb.client.model.Aggregates.sort;
5157
import static java.util.stream.Collectors.toList;
5258
import static org.junit.jupiter.api.Assertions.assertEquals;
59+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
5360

5461
public abstract class OperationTest {
5562

5663
protected static final DocumentCodec DOCUMENT_DECODER = new DocumentCodec();
64+
protected static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
5765

5866
@BeforeEach
5967
public void beforeEach() {
60-
ServerHelper.checkPool(getPrimary());
68+
assumeTrue(ServerHelper.checkPoolCount(getPrimary()) == 0, "Sync Pool count not zero");
69+
assumeTrue(ServerHelper.checkAsyncPoolCount(getPrimary()) == 0, "Async Pool count not zero");
6170
CollectionHelper.drop(getNamespace());
6271
}
6372

@@ -77,15 +86,15 @@ private CollectionHelper<BsonDocument> getCollectionHelper(final MongoNamespace
7786
return new CollectionHelper<>(new BsonDocumentCodec(), namespace);
7887
}
7988

80-
private String getDatabaseName() {
89+
protected String getDatabaseName() {
8190
return ClusterFixture.getDefaultDatabaseName();
8291
}
8392

84-
private String getCollectionName() {
93+
protected String getCollectionName() {
8594
return "test";
8695
}
8796

88-
MongoNamespace getNamespace() {
97+
protected MongoNamespace getNamespace() {
8998
return new MongoNamespace(getDatabaseName(), getCollectionName());
9099
}
91100

@@ -97,7 +106,6 @@ public static BsonDocument toBsonDocument(final BsonDocument bsonDocument) {
97106
return getDefaultCodecRegistry().get(BsonDocument.class).decode(bsonDocument.asBsonReader(), DecoderContext.builder().build());
98107
}
99108

100-
101109
protected List<Bson> assertPipeline(final String stageAsString, final Bson stage) {
102110
List<Bson> pipeline = Collections.singletonList(stage);
103111
return assertPipeline(stageAsString, pipeline);
@@ -159,4 +167,25 @@ protected List<Object> aggregateWithWindowFields(@Nullable final Object partitio
159167
.map(doc -> doc.get("result"))
160168
.collect(toList());
161169
}
170+
171+
protected <T> void ifNotNull(@Nullable final T maybeNull, final Consumer<T> consumer) {
172+
if (maybeNull != null) {
173+
consumer.accept(maybeNull);
174+
}
175+
}
176+
177+
protected void sleep(final long ms) {
178+
try {
179+
Thread.sleep(ms);
180+
} catch (InterruptedException e) {
181+
Thread.currentThread().interrupt();
182+
throw new RuntimeException(e);
183+
}
184+
}
185+
186+
protected <T> T block(final Consumer<FutureResultCallback<T>> consumer) {
187+
FutureResultCallback<T> cb = new FutureResultCallback<>();
188+
consumer.accept(cb);
189+
return cb.get(TIMEOUT, TimeUnit.SECONDS);
190+
}
162191
}

driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ public static void checkPool(final ServerAddress address) {
3636
checkPool(address, getAsyncCluster());
3737
}
3838

39+
public static int checkPoolCount(final ServerAddress address) {
40+
return getConnectionPool(address, getCluster()).getInUseCount();
41+
}
42+
43+
public static int checkAsyncPoolCount(final ServerAddress address) {
44+
return getConnectionPool(address, getAsyncCluster()).getInUseCount();
45+
}
46+
3947
public static void waitForLastRelease(final Cluster cluster) {
4048
for (ServerDescription cur : cluster.getCurrentDescription().getServerDescriptions()) {
4149
if (cur.isOk()) {
@@ -45,12 +53,11 @@ public static void waitForLastRelease(final Cluster cluster) {
4553
}
4654

4755
public static void waitForLastRelease(final ServerAddress address, final Cluster cluster) {
48-
ConcurrentPool<UsageTrackingInternalConnection> pool = connectionPool(
49-
cluster.selectServer(new ServerAddressSelector(address), OPERATION_CONTEXT).getServer());
56+
ConcurrentPool<UsageTrackingInternalConnection> pool = getConnectionPool(address, cluster);
5057
long startTime = System.currentTimeMillis();
5158
while (pool.getInUseCount() > 0) {
5259
try {
53-
sleep(10);
60+
sleep(100);
5461
if (System.currentTimeMillis() > startTime + ClusterFixture.TIMEOUT * 1000) {
5562
throw new MongoTimeoutException("Timed out waiting for pool in use count to drop to 0. Now at: "
5663
+ pool.getInUseCount());
@@ -61,11 +68,15 @@ public static void waitForLastRelease(final ServerAddress address, final Cluster
6168
}
6269
}
6370

71+
private static ConcurrentPool<UsageTrackingInternalConnection> getConnectionPool(final ServerAddress address, final Cluster cluster) {
72+
return connectionPool(cluster.selectServer(new ServerAddressSelector(address), OPERATION_CONTEXT).getServer());
73+
}
74+
6475
private static void checkPool(final ServerAddress address, final Cluster cluster) {
65-
ConcurrentPool<UsageTrackingInternalConnection> pool = connectionPool(
66-
cluster.selectServer(new ServerAddressSelector(address), OPERATION_CONTEXT).getServer());
67-
if (pool.getInUseCount() > 0) {
68-
throw new IllegalStateException("Connection pool in use count is " + pool.getInUseCount());
76+
try {
77+
waitForLastRelease(address, cluster);
78+
} catch (MongoTimeoutException e) {
79+
throw new IllegalStateException(e.getMessage());
6980
}
7081
}
7182

driver-core/src/test/functional/com/mongodb/internal/operation/AggregateOperationSpecification.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.bson.codecs.BsonDocumentCodec
5050
import org.bson.codecs.DocumentCodec
5151
import spock.lang.IgnoreIf
5252

53-
import static QueryOperationHelper.getKeyPattern
53+
import static TestOperationHelper.getKeyPattern
5454
import static com.mongodb.ClusterFixture.OPERATION_CONTEXT
5555
import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS
5656
import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_MAX_TIME

0 commit comments

Comments
 (0)