Skip to content

Commit e23f1f6

Browse files
committed
Made CursorResourceManager abstract
Shared usage in sync and async Part of a wider Command Cursor refactoring JAVA-5159
1 parent 985b9ec commit e23f1f6

File tree

7 files changed

+439
-448
lines changed

7 files changed

+439
-448
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
188188
try {
189189
List<T> convertedResults;
190190
try {
191-
convertedResults = convertAndProduceLastId(result, changeStreamOperation.getDecoder(),
191+
convertedResults = convertAndProduceLastId(assertNotNull(result), changeStreamOperation.getDecoder(),
192192
lastId -> resumeToken = lastId);
193193
} finally {
194194
cachePostBatchResumeToken(wrappedCursor);

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 202 additions & 272 deletions
Large diffs are not rendered by default.

driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ interface AsyncCallableWithConnection {
6161
void call(@Nullable AsyncConnection connection, @Nullable Throwable t);
6262
}
6363

64+
interface AsyncCallableConnectionWithCallback<T> {
65+
void call(@Nullable AsyncConnection connection, SingleResultCallback<T> callback);
66+
}
67+
6468
interface AsyncCallableWithSource {
6569
void call(@Nullable AsyncConnectionSource source, @Nullable Throwable t);
6670
}

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,20 @@ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor<RawBso
165165
* @param lastIdConsumer Is {@linkplain Consumer#accept(Object) called} iff {@code rawDocuments} is successfully converted
166166
* and the returned {@link List} is neither {@code null} nor {@linkplain List#isEmpty() empty}.
167167
*/
168-
@Nullable
169168
static <T> List<T> convertAndProduceLastId(@Nullable final List<RawBsonDocument> rawDocuments,
170169
final Decoder<T> decoder,
171170
final Consumer<BsonDocument> lastIdConsumer) {
172-
List<T> results = null;
171+
List<T> results = new ArrayList<>();
173172
if (rawDocuments != null) {
174-
results = new ArrayList<>();
175173
for (RawBsonDocument rawDocument : rawDocuments) {
176174
if (!rawDocument.containsKey("_id")) {
177175
throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing.");
178176
}
179177
results.add(rawDocument.decode(decoder));
180178
}
181-
lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id"));
179+
if (!rawDocuments.isEmpty()) {
180+
lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id"));
181+
}
182182
}
183183
return results;
184184
}

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 134 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,47 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoCommandException;
20+
import com.mongodb.MongoException;
2021
import com.mongodb.MongoNamespace;
22+
import com.mongodb.MongoSocketException;
2123
import com.mongodb.ReadPreference;
2224
import com.mongodb.ServerAddress;
2325
import com.mongodb.ServerCursor;
26+
import com.mongodb.annotations.ThreadSafe;
27+
import com.mongodb.connection.ConnectionDescription;
2428
import com.mongodb.connection.ServerType;
2529
import com.mongodb.internal.VisibleForTesting;
2630
import com.mongodb.internal.binding.ConnectionSource;
2731
import com.mongodb.internal.connection.Connection;
28-
import com.mongodb.internal.diagnostics.logging.Logger;
29-
import com.mongodb.internal.diagnostics.logging.Loggers;
30-
import com.mongodb.internal.validator.NoOpFieldNameValidator;
32+
import com.mongodb.internal.connection.OperationContext;
3133
import com.mongodb.lang.Nullable;
3234
import org.bson.BsonDocument;
3335
import org.bson.BsonTimestamp;
3436
import org.bson.BsonValue;
35-
import org.bson.FieldNameValidator;
37+
import org.bson.codecs.BsonDocumentCodec;
3638
import org.bson.codecs.Decoder;
3739

3840
import java.util.List;
3941
import java.util.NoSuchElementException;
42+
import java.util.concurrent.locks.StampedLock;
43+
import java.util.function.Consumer;
44+
import java.util.function.Supplier;
4045

4146
import static com.mongodb.assertions.Assertions.assertNotNull;
47+
import static com.mongodb.assertions.Assertions.assertTrue;
4248
import static com.mongodb.assertions.Assertions.notNull;
4349
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
4450
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
4551
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
4652
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_ITERATOR;
4753
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
54+
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NO_OP_FIELD_NAME_VALIDATOR;
55+
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand;
56+
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getCommandCursorResult;
4857
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
49-
import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
50-
import static java.lang.String.format;
58+
import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
5159

5260
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
53-
private static final Logger LOGGER = Loggers.getLogger("operation");
54-
private static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
5561

5662
private final MongoNamespace namespace;
5763
private final int limit;
@@ -61,11 +67,11 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
6167
private final BsonValue comment;
6268
private final int maxWireVersion;
6369
private final boolean firstBatchEmpty;
64-
private final CursorResourceManager resourceManager;
70+
private final ResourceManager resourceManager;
6571

6672
private int batchSize;
6773
private CommandCursorResult<T> commandCursorResult;
68-
private int count;
74+
private int count = 0;
6975
@Nullable
7076
private List<T> nextBatch;
7177

@@ -76,26 +82,26 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7682
@Nullable final BsonValue comment,
7783
final ConnectionSource connectionSource,
7884
final Connection connection) {
79-
this.commandCursorResult = initFromCommandCursorDocument(connection.getDescription().getServerAddress(),
80-
FIRST_BATCH, commandCursorDocument);
85+
ConnectionDescription connectionDescription = connection.getDescription();
86+
this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
8187
this.namespace = commandCursorResult.getNamespace();
8288
this.limit = limit;
8389
this.batchSize = batchSize;
8490
this.maxTimeMS = maxTimeMS;
8591
this.decoder = notNull("decoder", decoder);
8692
this.comment = comment;
87-
this.maxWireVersion = connection.getDescription().getMaxWireVersion();
93+
this.maxWireVersion = connectionDescription.getMaxWireVersion();
8894
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
8995

9096
Connection connectionToPin = null;
9197
boolean releaseServerAndResources = false;
9298
if (limitReached()) {
9399
releaseServerAndResources = true;
94-
} else if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
100+
} else if (connectionDescription.getServerType() == ServerType.LOAD_BALANCER) {
95101
connectionToPin = connection;
96102
}
97103

98-
resourceManager = new CursorResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
104+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
99105
if (releaseServerAndResources) {
100106
resourceManager.releaseServerAndClientResources(connection);
101107
}
@@ -115,7 +121,7 @@ private boolean doHasNext() {
115121
return false;
116122
}
117123

118-
while (resourceManager.serverCursor() != null) {
124+
while (resourceManager.getServerCursor() != null) {
119125
getMore();
120126
if (!resourceManager.operable()) {
121127
throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
@@ -194,7 +200,7 @@ private boolean tryHasNext() {
194200
return false;
195201
}
196202

197-
if (resourceManager.serverCursor() != null) {
203+
if (resourceManager.getServerCursor() != null) {
198204
getMore();
199205
}
200206

@@ -207,8 +213,7 @@ public ServerCursor getServerCursor() {
207213
if (!resourceManager.operable()) {
208214
throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
209215
}
210-
211-
return resourceManager.serverCursor();
216+
return resourceManager.getServerCursor();
212217
}
213218

214219
@Override
@@ -241,11 +246,11 @@ public int getMaxWireVersion() {
241246
}
242247

243248
private void getMore() {
244-
ServerCursor serverCursor = assertNotNull(resourceManager.serverCursor());
249+
ServerCursor serverCursor = assertNotNull(resourceManager.getServerCursor());
245250
resourceManager.executeWithConnection(connection -> {
246251
ServerCursor nextServerCursor;
247252
try {
248-
initFromCommandCursorDocument(connection.getDescription().getServerAddress(), NEXT_BATCH,
253+
this.commandCursorResult = toCommandCursorResult(connection.getDescription().getServerAddress(), NEXT_BATCH,
249254
assertNotNull(
250255
connection.command(namespace.getDatabaseName(),
251256
getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace,
@@ -265,21 +270,123 @@ private void getMore() {
265270
});
266271
}
267272

268-
private CommandCursorResult<T> initFromCommandCursorDocument(final ServerAddress serverAddress, final String fieldNameContainingBatch,
273+
private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
269274
final BsonDocument commandCursorDocument) {
270-
this.commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch, commandCursorDocument);
275+
CommandCursorResult<T> commandCursorResult = getCommandCursorResult(serverAddress, fieldNameContainingBatch, commandCursorDocument);
271276
this.nextBatch = commandCursorResult.getResults().isEmpty() ? null : commandCursorResult.getResults();
272277
this.count += commandCursorResult.getResults().size();
273-
if (LOGGER.isDebugEnabled()) {
274-
LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", commandCursorResult.getResults().size(),
275-
commandCursorResult.getCursorId(), commandCursorResult.getServerAddress()));
276-
}
277278
return commandCursorResult;
278279
}
279280

280281
private boolean limitReached() {
281282
return Math.abs(limit) != 0 && count >= Math.abs(limit);
282283
}
283284

285+
@ThreadSafe
286+
private static final class ResourceManager extends CursorResourceManager<ConnectionSource, Connection> {
287+
288+
ResourceManager(
289+
final MongoNamespace namespace,
290+
final ConnectionSource connectionSource,
291+
@Nullable final Connection connectionToPin,
292+
@Nullable final ServerCursor serverCursor) {
293+
super(new StampedLock().asWriteLock(), namespace, connectionSource, connectionToPin, serverCursor);
294+
}
295+
296+
/**
297+
* Thread-safe.
298+
* Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
299+
*
300+
* @throws IllegalStateException If {@linkplain CommandBatchCursor#close() closed}.
301+
*/
302+
@Nullable
303+
<R> R execute(final String exceptionMessageIfClosed, final Supplier<R> operation) throws IllegalStateException {
304+
if (!tryStartOperation()) {
305+
throw new IllegalStateException(exceptionMessageIfClosed);
306+
}
307+
try {
308+
return operation.get();
309+
} finally {
310+
endOperation();
311+
}
312+
}
284313

314+
@Override
315+
void markAsPinned(final Connection connectionToPin, final Connection.PinningMode pinningMode) {
316+
connectionToPin.markAsPinned(pinningMode);
317+
}
318+
319+
@Override
320+
void doClose() {
321+
if (isSkipReleasingServerResourcesOnClose()) {
322+
unsetServerCursor();
323+
}
324+
try {
325+
if (getServerCursor() != null) {
326+
// Don't handle corrupted connections
327+
Connection connection = getConnection();
328+
try {
329+
releaseServerResources(connection);
330+
} finally {
331+
connection.release();
332+
}
333+
}
334+
} catch (MongoException e) {
335+
// ignore exceptions when releasing server resources
336+
} finally {
337+
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
338+
unsetServerCursor();
339+
releaseClientResources();
340+
}
341+
}
342+
343+
void executeWithConnection(final Consumer<Connection> action) {
344+
Connection connection = getConnection();
345+
try {
346+
action.accept(connection);
347+
} catch (MongoSocketException e) {
348+
onCorruptedConnection(connection, e);
349+
throw e;
350+
} finally {
351+
connection.release();
352+
}
353+
}
354+
355+
private Connection getConnection() {
356+
assertTrue(getState() != State.IDLE);
357+
Connection pinnedConnection = getPinnedConnection();
358+
if (pinnedConnection == null) {
359+
return assertNotNull(getConnectionSource()).getConnection();
360+
} else {
361+
return pinnedConnection.retain();
362+
}
363+
}
364+
365+
private void releaseServerAndClientResources(final Connection connection) {
366+
try {
367+
releaseServerResources(assertNotNull(connection));
368+
} finally {
369+
releaseClientResources();
370+
}
371+
}
372+
373+
private void releaseServerResources(final Connection connection) {
374+
try {
375+
ServerCursor localServerCursor = getServerCursor();
376+
if (localServerCursor != null) {
377+
killServerCursor(getNamespace(), localServerCursor, connection);
378+
}
379+
} finally {
380+
unsetServerCursor();
381+
}
382+
}
383+
384+
private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,
385+
final Connection localConnection) {
386+
OperationContext operationContext = assertNotNull(getConnectionSource()).getOperationContext();
387+
localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
388+
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
389+
operationContext);
390+
}
391+
}
285392
}

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import com.mongodb.MongoCursorNotFoundException;
2121
import com.mongodb.MongoNamespace;
2222
import com.mongodb.MongoQueryException;
23+
import com.mongodb.ServerAddress;
2324
import com.mongodb.ServerCursor;
2425
import com.mongodb.connection.ConnectionDescription;
2526
import com.mongodb.internal.validator.NoOpFieldNameValidator;
2627
import com.mongodb.lang.Nullable;
28+
import org.bson.BsonArray;
2729
import org.bson.BsonDocument;
2830
import org.bson.BsonInt32;
2931
import org.bson.BsonInt64;
@@ -33,7 +35,10 @@
3335

3436
import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
3537
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
38+
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
3639
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
40+
import static java.lang.String.format;
41+
import static java.util.Collections.singletonList;
3742

3843
final class CommandBatchCursorHelper {
3944

@@ -65,6 +70,23 @@ static BsonDocument getMoreCommandDocument(
6570
return document;
6671
}
6772

73+
static <T> CommandCursorResult<T> getCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
74+
final BsonDocument commandCursorDocument) {
75+
CommandCursorResult<T> commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
76+
commandCursorDocument);
77+
if (LOGGER.isDebugEnabled()) {
78+
LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", commandCursorResult.getResults().size(),
79+
commandCursorResult.getCursorId(), commandCursorResult.getServerAddress()));
80+
}
81+
return commandCursorResult;
82+
}
83+
84+
static BsonDocument getKillCursorsCommand(final MongoNamespace namespace, final ServerCursor serverCursor) {
85+
return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
86+
.append("cursors", new BsonArray(singletonList(new BsonInt64(serverCursor.getId()))));
87+
}
88+
89+
6890
static MongoQueryException translateCommandException(final MongoCommandException commandException, final ServerCursor cursor) {
6991
if (commandException.getErrorCode() == 43) {
7092
return new MongoCursorNotFoundException(cursor.getId(), commandException.getResponse(), cursor.getAddress());

0 commit comments

Comments
 (0)