|
33 | 33 | import java.util.function.Consumer;
|
34 | 34 | import java.util.function.Function;
|
35 | 35 |
|
| 36 | +import static com.mongodb.assertions.Assertions.assertNotNull; |
36 | 37 | import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
|
37 | 38 | import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;
|
38 | 39 |
|
@@ -94,8 +95,9 @@ public int available() {
|
94 | 95 | public List<T> tryNext() {
|
95 | 96 | return resumeableOperation(commandBatchCursor -> {
|
96 | 97 | try {
|
97 |
| - return convertAndProduceLastId(commandBatchCursor.tryNext(), changeStreamOperation.getDecoder(), |
98 |
| - lastId -> resumeToken = lastId); |
| 98 | + List<RawBsonDocument> tryNext = commandBatchCursor.tryNext(); |
| 99 | + return tryNext == null ? null : |
| 100 | + convertAndProduceLastId(tryNext, changeStreamOperation.getDecoder(), lastId -> resumeToken = lastId); |
99 | 101 | } finally {
|
100 | 102 | cachePostBatchResumeToken(commandBatchCursor);
|
101 | 103 | }
|
@@ -165,20 +167,18 @@ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor<RawBso
|
165 | 167 | * @param lastIdConsumer Is {@linkplain Consumer#accept(Object) called} iff {@code rawDocuments} is successfully converted
|
166 | 168 | * and the returned {@link List} is neither {@code null} nor {@linkplain List#isEmpty() empty}.
|
167 | 169 | */
|
168 |
| - static <T> List<T> convertAndProduceLastId(@Nullable final List<RawBsonDocument> rawDocuments, |
| 170 | + static <T> List<T> convertAndProduceLastId(final List<RawBsonDocument> rawDocuments, |
169 | 171 | final Decoder<T> decoder,
|
170 | 172 | final Consumer<BsonDocument> lastIdConsumer) {
|
171 | 173 | List<T> results = new ArrayList<>();
|
172 |
| - if (rawDocuments != null) { |
173 |
| - for (RawBsonDocument rawDocument : rawDocuments) { |
174 |
| - if (!rawDocument.containsKey("_id")) { |
175 |
| - throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing."); |
176 |
| - } |
177 |
| - results.add(rawDocument.decode(decoder)); |
178 |
| - } |
179 |
| - if (!rawDocuments.isEmpty()) { |
180 |
| - lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id")); |
| 174 | + for (RawBsonDocument rawDocument : assertNotNull(rawDocuments)) { |
| 175 | + if (!rawDocument.containsKey("_id")) { |
| 176 | + throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing."); |
181 | 177 | }
|
| 178 | + results.add(rawDocument.decode(decoder)); |
| 179 | + } |
| 180 | + if (!rawDocuments.isEmpty()) { |
| 181 | + lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id")); |
182 | 182 | }
|
183 | 183 | return results;
|
184 | 184 | }
|
|
0 commit comments