|
55 | 55 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CONCURRENT_OPERATION;
|
56 | 56 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
|
57 | 57 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.NO_OP_FIELD_NAME_VALIDATOR;
|
58 |
| -import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand; |
59 | 58 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.getCommandCursorResult;
|
| 59 | +import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand; |
60 | 60 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
|
61 | 61 | import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
|
62 | 62 | import static java.util.Collections.emptyList;
|
@@ -116,8 +116,6 @@ public void next(final SingleResultCallback<List<T>> callback) {
|
116 | 116 | return;
|
117 | 117 | }
|
118 | 118 |
|
119 |
| - |
120 |
| - |
121 | 119 | resourceManager.execute((AsyncCallbackSupplier<List<T>>) funcCallback -> {
|
122 | 120 | ServerCursor localServerCursor = resourceManager.getServerCursor();
|
123 | 121 | boolean serverCursorIsNull = localServerCursor == null;
|
@@ -193,39 +191,44 @@ public int getMaxWireVersion() {
|
193 | 191 |
|
194 | 192 | private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
|
195 | 193 | resourceManager.executeWithConnection((connection, wrappedCallback) ->
|
196 |
| - assertNotNull(connection).commandAsync(namespace.getDatabaseName(), |
197 |
| - getMoreCommandDocument(cursor.getId(), connection.getDescription(), namespace, |
198 |
| - limit, batchSize, count.get(), maxTimeMS, comment), |
199 |
| - NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), |
200 |
| - CommandResultDocumentCodec.create(decoder, NEXT_BATCH), |
201 |
| - assertNotNull(resourceManager.getConnectionSource()).getOperationContext(), |
202 |
| - (commandResult, t) -> { |
203 |
| - if (t != null) { |
204 |
| - Throwable translatedException = |
205 |
| - t instanceof MongoCommandException |
206 |
| - ? translateCommandException((MongoCommandException) t, cursor) |
207 |
| - : t; |
208 |
| - wrappedCallback.onResult(null, translatedException); |
209 |
| - return; |
210 |
| - } |
211 |
| - CommandCursorResult<T> commandCursorResult = toCommandCursorResult( |
212 |
| - connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult)); |
213 |
| - resourceManager.setServerCursor(commandCursorResult.getServerCursor()); |
214 |
| - |
215 |
| - if (!resourceManager.operable()) { |
216 |
| - // The cursor was closed |
217 |
| - resourceManager.releaseServerAndClientResources(connection); |
218 |
| - wrappedCallback.onResult(emptyList(), null); |
219 |
| - return; |
220 |
| - } |
221 |
| - |
222 |
| - List<T> nextBatch = commandCursorResult.getResults(); |
223 |
| - if (nextBatch.isEmpty() && commandCursorResult.getServerCursor() != null) { |
224 |
| - getMore(commandCursorResult.getServerCursor(), wrappedCallback); |
225 |
| - } else { |
226 |
| - wrappedCallback.onResult(nextBatch, null); |
227 |
| - } |
228 |
| - }), callback); |
| 194 | + getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback); |
| 195 | + } |
| 196 | + |
| 197 | + private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor, |
| 198 | + final SingleResultCallback<List<T>> callback) { |
| 199 | + connection.commandAsync(namespace.getDatabaseName(), |
| 200 | + getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, |
| 201 | + limit, batchSize, count.get(), maxTimeMS, comment), |
| 202 | + NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), |
| 203 | + CommandResultDocumentCodec.create(decoder, NEXT_BATCH), |
| 204 | + assertNotNull(resourceManager.getConnectionSource()).getOperationContext(), |
| 205 | + (commandResult, t) -> { |
| 206 | + if (t != null) { |
| 207 | + Throwable translatedException = |
| 208 | + t instanceof MongoCommandException |
| 209 | + ? translateCommandException((MongoCommandException) t, serverCursor) |
| 210 | + : t; |
| 211 | + callback.onResult(null, translatedException); |
| 212 | + return; |
| 213 | + } |
| 214 | + CommandCursorResult<T> commandCursorResult = toCommandCursorResult( |
| 215 | + connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult)); |
| 216 | + resourceManager.setServerCursor(commandCursorResult.getServerCursor()); |
| 217 | + |
| 218 | + if (!resourceManager.operable()) { |
| 219 | + // The cursor was closed |
| 220 | + resourceManager.releaseServerAndClientResources(connection); |
| 221 | + callback.onResult(emptyList(), null); |
| 222 | + return; |
| 223 | + } |
| 224 | + |
| 225 | + List<T> nextBatch = commandCursorResult.getResults(); |
| 226 | + if (nextBatch.isEmpty() && commandCursorResult.getServerCursor() != null) { |
| 227 | + getMoreLoop(connection, commandCursorResult.getServerCursor(), callback); |
| 228 | + } else { |
| 229 | + callback.onResult(nextBatch, null); |
| 230 | + } |
| 231 | + }); |
229 | 232 | }
|
230 | 233 |
|
231 | 234 | private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
|
|
0 commit comments