Skip to content

Abstract Cursor Resource Manager #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
985b9ec
Autorefactor out CursorResourceManager
rozza Oct 27, 2023
e23f1f6
Made CursorResourceManager abstract
rozza Oct 11, 2023
3083b64
Reuse the async connection in the getmore loop
rozza Oct 30, 2023
026cec7
PR updates
rozza Oct 30, 2023
7db464c
Just use ReentrantLock
rozza Oct 31, 2023
76bf15e
convertAndProduceLastId now is fully non null
rozza Oct 31, 2023
b6ee18f
Remove limit and count from CommandBatchCursors and clarify initial c…
rozza Oct 31, 2023
dd3b55c
Fix ChangeStreamBatchCursorSpecification.groovy
rozza Nov 1, 2023
8d9f3b1
Codenarc / checkstyle fixes and cleanups
rozza Nov 1, 2023
bfdf0d5
Ternary ifs ftw
rozza Nov 1, 2023
6c2686e
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 2, 2023
86330d7
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 2, 2023
0e81675
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 2, 2023
44d9b0f
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 2, 2023
d90a54a
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 2, 2023
7e67a4f
Remove notNull checks for API items that aren't marked nullable
rozza Nov 2, 2023
29d6f46
Update driver-core/src/main/com/mongodb/internal/operation/AsyncComma…
rozza Nov 3, 2023
238cd21
Update driver-core/src/main/com/mongodb/internal/operation/CursorReso…
rozza Nov 3, 2023
57412bb
Updated / simplified AsyncCommandBatchCursor.ResourceManager.execute
stIncMale Nov 3, 2023
569e281
Revert "Update driver-core/src/main/com/mongodb/internal/operation/Cu…
rozza Nov 3, 2023
b217463
Update driver-core/src/main/com/mongodb/assertions/Assertions.java
rozza Nov 6, 2023
2ba861f
doesNotThrow fix
rozza Nov 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions driver-core/src/main/com/mongodb/assertions/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.lang.Nullable;

import java.util.Collection;
import java.util.function.Supplier;

/**
* <p>Design by contract assertions.</p> <p>This class is not part of the public API and may be removed or changed at any time.</p>
Expand Down Expand Up @@ -226,6 +227,19 @@ public static AssertionError fail(final String msg) throws AssertionError {
throw new AssertionError(assertNotNull(msg));
}

/**
* @param supplier the supplier to check
* @return {@code supplier.get()}
* @throws AssertionError If {@code supplier.get()} throws an exception
*/
public static <T> T doesNotThrow(final Supplier<T> supplier) throws AssertionError {
try {
return supplier.get();
} catch (Exception e) {
throw new AssertionError(e.getMessage(), e);
}
}

private Assertions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer(
return (result, source, connection) -> {
// TODO (CSOT) JAVA-4058
long maxAwaitTimeMS = timeoutSettings.getMaxAwaitTimeMS();
return new CommandBatchCursor<>(result, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
return new CommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
};
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) -> {
// TODO (CSOT) JAVA-4058
long maxAwaitTimeMS = timeoutSettings.getMaxAwaitTimeMS();
return new AsyncCommandBatchCursor<>(result, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
return new AsyncCommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
try {
List<T> convertedResults;
try {
convertedResults = convertAndProduceLastId(result, changeStreamOperation.getDecoder(),
convertedResults = convertAndProduceLastId(assertNotNull(result), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
cachePostBatchResumeToken(wrappedCursor);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ interface AsyncCallableWithConnection {
void call(@Nullable AsyncConnection connection, @Nullable Throwable t);
}

interface AsyncCallableConnectionWithCallback<T> {
void call(AsyncConnection connection, SingleResultCallback<T> callback);
}

interface AsyncCallableWithSource {
void call(@Nullable AsyncConnectionSource source, @Nullable Throwable t);
}
Expand Down Expand Up @@ -335,8 +339,7 @@ static <T> CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncS

static <T> AsyncBatchCursor<T> cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder<T> decoder,
final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection, final int batchSize) {
return new AsyncCommandBatchCursor<>(cursorDocument, 0, batchSize, 0, decoder,
comment, source, connection);
return new AsyncCommandBatchCursor<>(cursorDocument, batchSize, 0, decoder, comment, source, connection);
}

static <T> SingleResultCallback<T> releasingCallback(final SingleResultCallback<T> wrapped, final AsyncConnection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;

Expand Down Expand Up @@ -94,8 +95,9 @@ public int available() {
public List<T> tryNext() {
return resumeableOperation(commandBatchCursor -> {
try {
return convertAndProduceLastId(commandBatchCursor.tryNext(), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
List<RawBsonDocument> tryNext = commandBatchCursor.tryNext();
return tryNext == null ? null
: convertAndProduceLastId(tryNext, changeStreamOperation.getDecoder(), lastId -> resumeToken = lastId);
} finally {
cachePostBatchResumeToken(commandBatchCursor);
}
Expand Down Expand Up @@ -165,19 +167,17 @@ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor<RawBso
* @param lastIdConsumer Is {@linkplain Consumer#accept(Object) called} iff {@code rawDocuments} is successfully converted
* and the returned {@link List} is neither {@code null} nor {@linkplain List#isEmpty() empty}.
*/
@Nullable
static <T> List<T> convertAndProduceLastId(@Nullable final List<RawBsonDocument> rawDocuments,
static <T> List<T> convertAndProduceLastId(final List<RawBsonDocument> rawDocuments,
final Decoder<T> decoder,
final Consumer<BsonDocument> lastIdConsumer) {
List<T> results = null;
if (rawDocuments != null) {
results = new ArrayList<>();
for (RawBsonDocument rawDocument : rawDocuments) {
if (!rawDocument.containsKey("_id")) {
throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing.");
}
results.add(rawDocument.decode(decoder));
List<T> results = new ArrayList<>();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This follows on from the previous AsyncSingleBatchCursor change where next() no longer returns a null value.

for (RawBsonDocument rawDocument : assertNotNull(rawDocuments)) {
if (!rawDocument.containsKey("_id")) {
throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing.");
}
results.add(rawDocument.decode(decoder));
}
if (!rawDocuments.isEmpty()) {
lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id"));
}
return results;
Expand Down
Loading