Skip to content

Commit 985b9ec

Browse files
committed
Autorefactor out CursorResourceManager
1 parent a01e1dc commit 985b9ec

File tree

2 files changed

+338
-296
lines changed

2 files changed

+338
-296
lines changed

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 3 additions & 296 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoCommandException;
20-
import com.mongodb.MongoException;
2120
import com.mongodb.MongoNamespace;
22-
import com.mongodb.MongoSocketException;
2321
import com.mongodb.ReadPreference;
2422
import com.mongodb.ServerAddress;
2523
import com.mongodb.ServerCursor;
26-
import com.mongodb.annotations.ThreadSafe;
2724
import com.mongodb.connection.ServerType;
2825
import com.mongodb.internal.VisibleForTesting;
2926
import com.mongodb.internal.binding.ConnectionSource;
@@ -32,29 +29,17 @@
3229
import com.mongodb.internal.diagnostics.logging.Loggers;
3330
import com.mongodb.internal.validator.NoOpFieldNameValidator;
3431
import com.mongodb.lang.Nullable;
35-
import org.bson.BsonArray;
3632
import org.bson.BsonDocument;
37-
import org.bson.BsonInt64;
38-
import org.bson.BsonString;
3933
import org.bson.BsonTimestamp;
4034
import org.bson.BsonValue;
4135
import org.bson.FieldNameValidator;
42-
import org.bson.codecs.BsonDocumentCodec;
4336
import org.bson.codecs.Decoder;
4437

4538
import java.util.List;
4639
import java.util.NoSuchElementException;
47-
import java.util.concurrent.locks.Lock;
48-
import java.util.concurrent.locks.StampedLock;
49-
import java.util.function.Consumer;
50-
import java.util.function.Supplier;
5140

5241
import static com.mongodb.assertions.Assertions.assertNotNull;
53-
import static com.mongodb.assertions.Assertions.assertNull;
54-
import static com.mongodb.assertions.Assertions.assertTrue;
55-
import static com.mongodb.assertions.Assertions.fail;
5642
import static com.mongodb.assertions.Assertions.notNull;
57-
import static com.mongodb.internal.Locks.withLock;
5843
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
5944
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
6045
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
@@ -63,7 +48,6 @@
6348
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
6449
import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
6550
import static java.lang.String.format;
66-
import static java.util.Collections.singletonList;
6751

6852
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
6953
private static final Logger LOGGER = Loggers.getLogger("operation");
@@ -77,7 +61,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7761
private final BsonValue comment;
7862
private final int maxWireVersion;
7963
private final boolean firstBatchEmpty;
80-
private final ResourceManager resourceManager;
64+
private final CursorResourceManager resourceManager;
8165

8266
private int batchSize;
8367
private CommandCursorResult<T> commandCursorResult;
@@ -111,7 +95,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
11195
connectionToPin = connection;
11296
}
11397

114-
resourceManager = new ResourceManager(connectionSource, connectionToPin, commandCursorResult.getServerCursor());
98+
resourceManager = new CursorResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
11599
if (releaseServerAndResources) {
116100
resourceManager.releaseServerAndClientResources(connection);
117101
}
@@ -269,7 +253,7 @@ private void getMore() {
269253
NO_OP_FIELD_NAME_VALIDATOR,
270254
ReadPreference.primary(),
271255
CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
272-
assertNotNull(resourceManager.connectionSource).getOperationContext())));
256+
assertNotNull(resourceManager.getConnectionSource()).getOperationContext())));
273257
nextServerCursor = commandCursorResult.getServerCursor();
274258
} catch (MongoCommandException e) {
275259
throw translateCommandException(e, serverCursor);
@@ -297,282 +281,5 @@ private boolean limitReached() {
297281
return Math.abs(limit) != 0 && count >= Math.abs(limit);
298282
}
299283

300-
/**
301-
* This class maintains all resources that must be released in {@link CommandBatchCursor#close()}.
302-
* It also implements a {@linkplain #doClose() deferred close action} such that it is totally ordered with other operations of
303-
* {@link CommandBatchCursor} (methods {@link #tryStartOperation()}/{@link #endOperation()} must be used properly to enforce the order)
304-
* despite the method {@link CommandBatchCursor#close()} being called concurrently with those operations.
305-
* This total order induces the happens-before order.
306-
* <p>
307-
* The deferred close action does not violate externally observable idempotence of {@link CommandBatchCursor#close()},
308-
* because {@link CommandBatchCursor#close()} is allowed to release resources "eventually".
309-
* <p>
310-
* Only methods explicitly documented as thread-safe are thread-safe,
311-
* others are not and rely on the total order mentioned above.
312-
*/
313-
@ThreadSafe
314-
private final class ResourceManager {
315-
private final Lock lock;
316-
private volatile State state;
317-
@Nullable
318-
private volatile ConnectionSource connectionSource;
319-
@Nullable
320-
private volatile Connection pinnedConnection;
321-
@Nullable
322-
private volatile ServerCursor serverCursor;
323-
private volatile boolean skipReleasingServerResourcesOnClose;
324-
325-
ResourceManager(@Nullable final ConnectionSource connectionSource,
326-
@Nullable final Connection connectionToPin, @Nullable final ServerCursor serverCursor) {
327-
lock = new StampedLock().asWriteLock();
328-
state = State.IDLE;
329-
if (serverCursor != null) {
330-
this.connectionSource = (assertNotNull(connectionSource)).retain();
331-
if (connectionToPin != null) {
332-
this.pinnedConnection = connectionToPin.retain();
333-
connectionToPin.markAsPinned(Connection.PinningMode.CURSOR);
334-
}
335-
}
336-
skipReleasingServerResourcesOnClose = false;
337-
this.serverCursor = serverCursor;
338-
}
339-
340-
/**
341-
* Thread-safe.
342-
*/
343-
boolean operable() {
344-
return state.operable();
345-
}
346-
347-
/**
348-
* Thread-safe.
349-
* Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
350-
*
351-
* @throws IllegalStateException If {@linkplain CommandBatchCursor#close() closed}.
352-
*/
353-
@Nullable
354-
<R> R execute(final String exceptionMessageIfClosed, final Supplier<R> operation) throws IllegalStateException {
355-
if (!tryStartOperation()) {
356-
throw new IllegalStateException(exceptionMessageIfClosed);
357-
}
358-
try {
359-
return operation.get();
360-
} finally {
361-
endOperation();
362-
}
363-
}
364-
365-
/**
366-
* Thread-safe.
367-
* Returns {@code true} iff started an operation.
368-
* If {@linkplain #operable() closed}, then returns false, otherwise completes abruptly.
369-
* @throws IllegalStateException Iff another operation is in progress.
370-
*/
371-
private boolean tryStartOperation() throws IllegalStateException {
372-
return withLock(lock, () -> {
373-
State localState = state;
374-
if (!localState.operable()) {
375-
return false;
376-
} else if (localState == State.IDLE) {
377-
state = State.OPERATION_IN_PROGRESS;
378-
return true;
379-
} else if (localState == State.OPERATION_IN_PROGRESS) {
380-
throw new IllegalStateException("Another operation is currently in progress, concurrent operations are not supported");
381-
} else {
382-
throw fail(state.toString());
383-
}
384-
});
385-
}
386-
387-
/**
388-
* Thread-safe.
389-
*/
390-
private void endOperation() {
391-
boolean doClose = withLock(lock, () -> {
392-
State localState = state;
393-
if (localState == State.OPERATION_IN_PROGRESS) {
394-
state = State.IDLE;
395-
return false;
396-
} else if (localState == State.CLOSE_PENDING) {
397-
state = State.CLOSED;
398-
return true;
399-
} else {
400-
throw fail(localState.toString());
401-
}
402-
});
403-
if (doClose) {
404-
doClose();
405-
}
406-
}
407-
408-
/**
409-
* Thread-safe.
410-
*/
411-
void close() {
412-
boolean doClose = withLock(lock, () -> {
413-
State localState = state;
414-
if (localState == State.OPERATION_IN_PROGRESS) {
415-
state = State.CLOSE_PENDING;
416-
return false;
417-
} else if (localState != State.CLOSED) {
418-
state = State.CLOSED;
419-
return true;
420-
}
421-
return false;
422-
});
423-
if (doClose) {
424-
doClose();
425-
}
426-
}
427-
428-
/**
429-
* This method is never executed concurrently with either itself or other operations
430-
* demarcated by {@link #tryStartOperation()}/{@link #endOperation()}.
431-
*/
432-
private void doClose() {
433-
try {
434-
if (skipReleasingServerResourcesOnClose) {
435-
serverCursor = null;
436-
} else if (serverCursor != null) {
437-
Connection connection = connection();
438-
try {
439-
releaseServerResources(connection);
440-
} finally {
441-
connection.release();
442-
}
443-
}
444-
} catch (MongoException e) {
445-
// ignore exceptions when releasing server resources
446-
} finally {
447-
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
448-
serverCursor = null;
449-
releaseClientResources();
450-
}
451-
}
452-
453-
void onCorruptedConnection(final Connection corruptedConnection) {
454-
assertTrue(state.inProgress());
455-
// if `pinnedConnection` is corrupted, then we cannot kill `serverCursor` via such a connection
456-
Connection localPinnedConnection = pinnedConnection;
457-
if (localPinnedConnection != null) {
458-
assertTrue(corruptedConnection == localPinnedConnection);
459-
skipReleasingServerResourcesOnClose = true;
460-
}
461-
}
462-
463-
void executeWithConnection(final Consumer<Connection> action) {
464-
Connection connection = connection();
465-
try {
466-
action.accept(connection);
467-
} catch (MongoSocketException e) {
468-
try {
469-
onCorruptedConnection(connection);
470-
} catch (Exception suppressed) {
471-
e.addSuppressed(suppressed);
472-
}
473-
throw e;
474-
} finally {
475-
connection.release();
476-
}
477-
}
478-
479-
private Connection connection() {
480-
assertTrue(state != State.IDLE);
481-
if (pinnedConnection == null) {
482-
return assertNotNull(connectionSource).getConnection();
483-
} else {
484-
return assertNotNull(pinnedConnection).retain();
485-
}
486-
}
487-
488-
/**
489-
* Thread-safe.
490-
*/
491-
@Nullable
492-
ServerCursor serverCursor() {
493-
return serverCursor;
494-
}
495-
496-
void setServerCursor(@Nullable final ServerCursor serverCursor) {
497-
assertTrue(state.inProgress());
498-
assertNotNull(this.serverCursor);
499-
// without `connectionSource` we will not be able to kill `serverCursor` later
500-
assertNotNull(connectionSource);
501-
this.serverCursor = serverCursor;
502-
if (serverCursor == null) {
503-
releaseClientResources();
504-
}
505-
}
506-
507284

508-
void releaseServerAndClientResources(final Connection connection) {
509-
try {
510-
releaseServerResources(assertNotNull(connection));
511-
} finally {
512-
releaseClientResources();
513-
}
514-
}
515-
516-
private void releaseServerResources(final Connection connection) {
517-
try {
518-
ServerCursor localServerCursor = serverCursor;
519-
if (localServerCursor != null) {
520-
killServerCursor(namespace, localServerCursor, assertNotNull(connection));
521-
}
522-
} finally {
523-
serverCursor = null;
524-
}
525-
}
526-
527-
private void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final Connection connection) {
528-
connection.command(namespace.getDatabaseName(), asKillCursorsCommandDocument(namespace, serverCursor),
529-
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
530-
assertNotNull(connectionSource).getOperationContext());
531-
}
532-
533-
private BsonDocument asKillCursorsCommandDocument(final MongoNamespace namespace, final ServerCursor serverCursor) {
534-
return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
535-
.append("cursors", new BsonArray(singletonList(new BsonInt64(serverCursor.getId()))));
536-
}
537-
538-
private void releaseClientResources() {
539-
assertNull(serverCursor);
540-
ConnectionSource localConnectionSource = connectionSource;
541-
if (localConnectionSource != null) {
542-
localConnectionSource.release();
543-
connectionSource = null;
544-
}
545-
Connection localPinnedConnection = pinnedConnection;
546-
if (localPinnedConnection != null) {
547-
localPinnedConnection.release();
548-
pinnedConnection = null;
549-
}
550-
}
551-
}
552-
553-
private enum State {
554-
IDLE(true, false),
555-
OPERATION_IN_PROGRESS(true, true),
556-
/**
557-
* Implies {@link #OPERATION_IN_PROGRESS}.
558-
*/
559-
CLOSE_PENDING(false, true),
560-
CLOSED(false, false);
561-
562-
private final boolean operable;
563-
private final boolean inProgress;
564-
565-
State(final boolean operable, final boolean inProgress) {
566-
this.operable = operable;
567-
this.inProgress = inProgress;
568-
}
569-
570-
boolean operable() {
571-
return operable;
572-
}
573-
574-
boolean inProgress() {
575-
return inProgress;
576-
}
577-
}
578285
}

0 commit comments

Comments
 (0)