Skip to content

Commit b6ee18f

Browse files
committed
Remove limit and count from CommandBatchCursors and clarify initial connection in tests
1 parent 76bf15e commit b6ee18f

12 files changed

+265
-512
lines changed

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,16 @@ private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer(
224224
return (result, source, connection) -> {
225225
// TODO (CSOT) JAVA-4058
226226
long maxAwaitTimeMS = timeoutSettings.getMaxAwaitTimeMS();
227-
return new CommandBatchCursor<>(result, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
228-
comment, source, connection);
227+
return new CommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
228+
comment, source, connection);
229229
};
230230
}
231231

232232
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
233233
return (result, source, connection) -> {
234234
// TODO (CSOT) JAVA-4058
235235
long maxAwaitTimeMS = timeoutSettings.getMaxAwaitTimeMS();
236-
return new AsyncCommandBatchCursor<>(result, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
236+
return new AsyncCommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
237237
comment, source, connection);
238238
};
239239
}

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444

4545
import java.util.List;
4646
import java.util.concurrent.atomic.AtomicBoolean;
47-
import java.util.concurrent.atomic.AtomicInteger;
4847

4948
import static com.mongodb.assertions.Assertions.assertNotNull;
5049
import static com.mongodb.assertions.Assertions.assertTrue;
@@ -63,7 +62,6 @@
6362
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
6463

6564
private final MongoNamespace namespace;
66-
private final int limit;
6765
private final long maxTimeMS;
6866
private final Decoder<T> decoder;
6967
@Nullable
@@ -72,21 +70,19 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
7270
private final boolean firstBatchEmpty;
7371
private final ResourceManager resourceManager;
7472
private final CommandCursorResult<T> commandCursorResult;
75-
private final AtomicInteger count = new AtomicInteger();
7673
private final AtomicBoolean processedInitial = new AtomicBoolean();
7774
private int batchSize;
7875

7976
AsyncCommandBatchCursor(
8077
final BsonDocument commandCursorDocument,
81-
final int limit, final int batchSize, final long maxTimeMS,
78+
final int batchSize, final long maxTimeMS,
8279
final Decoder<T> decoder,
8380
@Nullable final BsonValue comment,
8481
final AsyncConnectionSource connectionSource,
8582
final AsyncConnection connection) {
8683
ConnectionDescription connectionDescription = connection.getDescription();
8784
this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
8885
this.namespace = commandCursorResult.getNamespace();
89-
this.limit = limit;
9086
this.batchSize = batchSize;
9187
this.maxTimeMS = maxTimeMS;
9288
this.decoder = notNull("decoder", decoder);
@@ -95,17 +91,11 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
9591
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
9692

9793
AsyncConnection connectionToPin = null;
98-
boolean releaseServerAndResources = false;
99-
if (limitReached()) {
100-
releaseServerAndResources = true;
101-
} else if (connectionDescription.getServerType() == ServerType.LOAD_BALANCER) {
94+
if (connectionDescription.getServerType() == ServerType.LOAD_BALANCER) {
10295
connectionToPin = connection;
10396
}
10497

10598
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
106-
if (releaseServerAndResources) {
107-
resourceManager.releaseServerAndClientResources(connection);
108-
}
10999
}
110100

111101
@Override
@@ -124,15 +114,12 @@ public void next(final SingleResultCallback<List<T>> callback) {
124114
}
125115

126116
if (serverCursorIsNull || !batchResults.isEmpty()) {
127-
if (serverCursorIsNull) {
128-
close();
129-
}
130117
funcCallback.onResult(batchResults, null);
131118
} else {
132119
getMore(localServerCursor, funcCallback);
133120
}
134121
}).get((r, t) -> {
135-
if (limitReached()) {
122+
if (resourceManager.getServerCursor() == null) {
136123
close();
137124
}
138125
callback.onResult(r, t);
@@ -196,8 +183,7 @@ private void getMore(final ServerCursor cursor, final SingleResultCallback<List<
196183
private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor,
197184
final SingleResultCallback<List<T>> callback) {
198185
connection.commandAsync(namespace.getDatabaseName(),
199-
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace,
200-
limit, batchSize, count.get(), maxTimeMS, comment),
186+
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, maxTimeMS, comment),
201187
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(),
202188
CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
203189
assertNotNull(resourceManager.getConnectionSource()).getOperationContext(),
@@ -214,35 +200,29 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
214200
connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
215201
resourceManager.setServerCursor(commandCursorResult.getServerCursor());
216202

203+
List<T> nextBatch = commandCursorResult.getResults();
204+
if (commandCursorResult.getServerCursor() == null || !nextBatch.isEmpty()) {
205+
callback.onResult(nextBatch, null);
206+
return;
207+
}
208+
217209
if (!resourceManager.operable()) {
218-
// The cursor was closed
219-
resourceManager.releaseServerAndClientResources(connection);
220210
callback.onResult(emptyList(), null);
221211
return;
222212
}
223213

224-
List<T> nextBatch = commandCursorResult.getResults();
225-
if (nextBatch.isEmpty() && commandCursorResult.getServerCursor() != null) {
226-
getMoreLoop(connection, commandCursorResult.getServerCursor(), callback);
227-
} else {
228-
callback.onResult(nextBatch, null);
229-
}
230-
});
214+
getMoreLoop(connection, commandCursorResult.getServerCursor(), callback);
215+
});
231216
}
232217

233218
private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
234219
final BsonDocument commandCursorDocument) {
235220
CommandCursorResult<T> commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
236221
commandCursorDocument);
237222
logCommandCursorResult(commandCursorResult);
238-
this.count.addAndGet(commandCursorResult.getResults().size());
239223
return commandCursorResult;
240224
}
241225

242-
private boolean limitReached() {
243-
return Math.abs(limit) != 0 && count.get() >= Math.abs(limit);
244-
}
245-
246226
@ThreadSafe
247227
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
248228

@@ -282,7 +262,7 @@ void doClose() {
282262
if (getServerCursor() != null) {
283263
getConnection((connection, t) -> {
284264
if (connection != null) {
285-
releaseServerAndClientResources(connection, (r, t1) -> connection.release());
265+
releaseServerAndClientResources(connection);
286266
} else {
287267
unsetServerCursor();
288268
releaseClientResources();
@@ -320,10 +300,6 @@ private void getConnection(final SingleResultCallback<AsyncConnection> callback)
320300
}
321301

322302
private void releaseServerAndClientResources(final AsyncConnection connection) {
323-
releaseServerAndClientResources(connection, (r, t) -> { /* Do nothing */ });
324-
}
325-
326-
private void releaseServerAndClientResources(final AsyncConnection connection, final SingleResultCallback<Void> callback) {
327303
AsyncCallbackSupplier<Void> callbackSupplier = funcCallback -> {
328304
ServerCursor localServerCursor = getServerCursor();
329305
if (localServerCursor != null) {
@@ -333,7 +309,7 @@ private void releaseServerAndClientResources(final AsyncConnection connection, f
333309
callbackSupplier.whenComplete(() -> {
334310
unsetServerCursor();
335311
releaseClientResources();
336-
}).get(callback);
312+
}).whenComplete(connection::release).get((r,t) -> { /* do nothing */ });
337313
}
338314

339315
private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,

driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,7 @@ static <T> CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncS
339339

340340
static <T> AsyncBatchCursor<T> cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder<T> decoder,
341341
final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection, final int batchSize) {
342-
return new AsyncCommandBatchCursor<>(cursorDocument, 0, batchSize, 0, decoder,
343-
comment, source, connection);
342+
return new AsyncCommandBatchCursor<>(cursorDocument, batchSize, 0, decoder, comment, source, connection);
344343
}
345344

346345
static <T> SingleResultCallback<T> releasingCallback(final SingleResultCallback<T> wrapped, final AsyncConnection connection) {

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
6060

6161
private final MongoNamespace namespace;
62-
private final int limit;
6362
private final long maxTimeMS;
6463
private final Decoder<T> decoder;
6564
@Nullable
@@ -70,21 +69,19 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7069

7170
private int batchSize;
7271
private CommandCursorResult<T> commandCursorResult;
73-
private int count = 0;
7472
@Nullable
7573
private List<T> nextBatch;
7674

7775
CommandBatchCursor(
7876
final BsonDocument commandCursorDocument,
79-
final int limit, final int batchSize, final long maxTimeMS,
77+
final int batchSize, final long maxTimeMS,
8078
final Decoder<T> decoder,
8179
@Nullable final BsonValue comment,
8280
final ConnectionSource connectionSource,
8381
final Connection connection) {
8482
ConnectionDescription connectionDescription = connection.getDescription();
8583
this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
8684
this.namespace = commandCursorResult.getNamespace();
87-
this.limit = limit;
8885
this.batchSize = batchSize;
8986
this.maxTimeMS = maxTimeMS;
9087
this.decoder = notNull("decoder", decoder);
@@ -93,17 +90,11 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
9390
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
9491

9592
Connection connectionToPin = null;
96-
boolean releaseServerAndResources = false;
97-
if (limitReached()) {
98-
releaseServerAndResources = true;
99-
} else if (connectionDescription.getServerType() == ServerType.LOAD_BALANCER) {
93+
if (connectionDescription.getServerType() == ServerType.LOAD_BALANCER) {
10094
connectionToPin = connection;
10195
}
10296

10397
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
104-
if (releaseServerAndResources) {
105-
resourceManager.releaseServerAndClientResources(connection);
106-
}
10798
}
10899

109100
@Override
@@ -116,10 +107,6 @@ private boolean doHasNext() {
116107
return true;
117108
}
118109

119-
if (limitReached()) {
120-
return false;
121-
}
122-
123110
while (resourceManager.getServerCursor() != null) {
124111
getMore();
125112
if (!resourceManager.operable()) {
@@ -195,10 +182,6 @@ private boolean tryHasNext() {
195182
return true;
196183
}
197184

198-
if (limitReached()) {
199-
return false;
200-
}
201-
202185
if (resourceManager.getServerCursor() != null) {
203186
getMore();
204187
}
@@ -252,8 +235,8 @@ private void getMore() {
252235
this.commandCursorResult = toCommandCursorResult(connection.getDescription().getServerAddress(), NEXT_BATCH,
253236
assertNotNull(
254237
connection.command(namespace.getDatabaseName(),
255-
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace,
256-
limit, batchSize, count, maxTimeMS, comment),
238+
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize,
239+
maxTimeMS, comment),
257240
NO_OP_FIELD_NAME_VALIDATOR,
258241
ReadPreference.primary(),
259242
CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
@@ -263,9 +246,6 @@ private void getMore() {
263246
throw translateCommandException(e, serverCursor);
264247
}
265248
resourceManager.setServerCursor(nextServerCursor);
266-
if (limitReached() || !resourceManager.operable()) {
267-
resourceManager.releaseServerAndClientResources(connection);
268-
}
269249
});
270250
}
271251

@@ -275,14 +255,9 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
275255
commandCursorDocument);
276256
logCommandCursorResult(commandCursorResult);
277257
this.nextBatch = commandCursorResult.getResults().isEmpty() ? null : commandCursorResult.getResults();
278-
this.count += commandCursorResult.getResults().size();
279258
return commandCursorResult;
280259
}
281260

282-
private boolean limitReached() {
283-
return Math.abs(limit) != 0 && count >= Math.abs(limit);
284-
}
285-
286261
@ThreadSafe
287262
private static final class ResourceManager extends CursorResourceManager<ConnectionSource, Connection> {
288263

@@ -362,14 +337,6 @@ private Connection getConnection() {
362337
}
363338
}
364339

365-
private void releaseServerAndClientResources(final Connection connection) {
366-
try {
367-
releaseServerResources(assertNotNull(connection));
368-
} finally {
369-
releaseClientResources();
370-
}
371-
}
372-
373340
private void releaseServerResources(final Connection connection) {
374341
try {
375342
ServerCursor localServerCursor = getServerCursor();

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,13 @@ final class CommandBatchCursorHelper {
5151
+ "supported";
5252

5353
static BsonDocument getMoreCommandDocument(
54-
final long cursorId, final ConnectionDescription connectionDescription, final MongoNamespace namespace, final int limit,
55-
final int batchSize, final int count, final long maxTimeMS, @Nullable final BsonValue comment) {
54+
final long cursorId, final ConnectionDescription connectionDescription, final MongoNamespace namespace, final int batchSize,
55+
final long maxTimeMS, @Nullable final BsonValue comment) {
5656
BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
5757
.append("collection", new BsonString(namespace.getCollectionName()));
5858

59-
int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, batchSize, count));
60-
if (batchSizeForGetMoreCommand != 0) {
61-
document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
59+
if (batchSize != 0) {
60+
document.append("batchSize", new BsonInt32(batchSize));
6261
}
6362
if (maxTimeMS != 0) {
6463
document.append("maxTimeMS", new BsonInt64(maxTimeMS));

driver-core/src/main/com/mongodb/internal/operation/FindOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ private boolean isAwaitData() {
456456

457457
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
458458
return (result, source, connection) ->
459-
new CommandBatchCursor<>(result, limit, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
459+
new CommandBatchCursor<>(result, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
460460
}
461461

462462
// TODO - CSOT JAVA-4058
@@ -466,6 +466,6 @@ private long getMaxTimeForCursor() {
466466

467467
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
468468
return (result, source, connection) ->
469-
new AsyncCommandBatchCursor<>(result, limit, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
469+
new AsyncCommandBatchCursor<>(result, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
470470
}
471471
}

driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ static <T> CommandReadTransformer<BsonDocument, BatchCursor<T>> singleBatchCurso
331331

332332
static <T> BatchCursor<T> cursorDocumentToBatchCursor(final BsonDocument cursorDocument, final Decoder<T> decoder,
333333
final BsonValue comment, final ConnectionSource source, final Connection connection, final int batchSize) {
334-
return new CommandBatchCursor<>(cursorDocument, 0, batchSize, 0, decoder, comment, source, connection);
334+
return new CommandBatchCursor<>(cursorDocument, batchSize, 0, decoder, comment, source, connection);
335335
}
336336

337337
private SyncOperationHelper() {

driver-core/src/test/functional/com/mongodb/ClusterFixture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ public static int getReferenceCountAfterTimeout(final ReferenceCounted reference
775775
int count = referenceCounted.getCount();
776776
while (count > target) {
777777
try {
778-
if (System.currentTimeMillis() > startTime + 5000) {
778+
if (System.currentTimeMillis() > startTime + TIMEOUT_DURATION.toMillis()) {
779779
return count;
780780
}
781781
Thread.sleep(10);

0 commit comments

Comments
 (0)