Skip to content

Commit 96da0b8

Browse files
committed
Replace use of Mono.create in GridFSUploadPublisherImpl
1 parent 905202f commit 96da0b8

File tree

1 file changed

+38
-71
lines changed

1 file changed

+38
-71
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 38 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@
4040
import java.util.Date;
4141
import java.util.Map;
4242
import java.util.concurrent.atomic.AtomicBoolean;
43-
import java.util.concurrent.atomic.AtomicInteger;
44-
import java.util.concurrent.atomic.AtomicLong;
45-
import java.util.function.Function;
4643

4744
import static com.mongodb.ReadPreference.primary;
4845
import static com.mongodb.assertions.Assertions.notNull;
@@ -149,39 +146,15 @@ public void subscribe(final Subscriber<? super ObjectId> subscriber) {
149146
}
150147

151148
private Mono<Void> createCheckAndCreateIndexesMono(@Nullable final Timeout timeout) {
152-
AtomicBoolean collectionExists = new AtomicBoolean(false);
153-
return Mono.create(sink -> findAllInCollection(filesCollection, timeout).subscribe(
154-
d -> collectionExists.set(true),
155-
sink::error,
156-
() -> {
157-
if (collectionExists.get()) {
158-
sink.success();
159-
} else {
160-
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout)
161-
.contextWrite(sink.contextView())
162-
.doOnSuccess(i -> checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout)
163-
.subscribe(unused -> {}, sink::error, sink::success))
164-
.subscribe(unused -> {}, sink::error);
165-
}
166-
})
167-
);
168-
}
169-
170-
private Mono<Document> findAllInCollection(final MongoCollection<GridFSFile> collection, @Nullable final Timeout timeout) {
171-
return collectionWithTimeoutDeferred(collection
172-
.withDocumentClass(Document.class)
173-
.withReadPreference(primary()), timeout)
174-
.flatMap(wrappedCollection -> {
175-
if (clientSession != null) {
176-
return Mono.from(wrappedCollection.find(clientSession)
177-
.projection(PROJECTION)
178-
.first());
179-
} else {
180-
return Mono.from(wrappedCollection.find()
181-
.projection(PROJECTION)
182-
.first());
183-
}
184-
});
149+
return collectionWithTimeoutDeferred(filesCollection.withDocumentClass(Document.class).withReadPreference(primary()), timeout)
150+
.map(collection -> clientSession != null ? collection.find(clientSession) : collection.find())
151+
.flatMap(findPublisher -> Mono.from(findPublisher.projection(PROJECTION).first()))
152+
.switchIfEmpty(Mono.defer(() ->
153+
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout)
154+
.then(checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout))
155+
.then(Mono.fromCallable(Document::new))
156+
))
157+
.then();
185158
}
186159

187160
private <T> Mono<Boolean> hasIndex(final MongoCollection<T> collection, final Document index, @Nullable final Timeout timeout) {
@@ -229,43 +202,37 @@ private <T> Mono<String> createIndexMono(final MongoCollection<T> collection, fi
229202
}
230203

231204
private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) {
232-
return Mono.create(sink -> {
233-
AtomicLong lengthInBytes = new AtomicLong(0);
234-
AtomicInteger chunkIndex = new AtomicInteger(0);
235-
new ResizingByteBufferFlux(source, chunkSizeBytes)
236-
.contextWrite(sink.contextView())
237-
.takeUntilOther(createMonoTimer(timeout))
238-
.flatMap((Function<ByteBuffer, Publisher<InsertOneResult>>) byteBuffer -> {
239-
if (terminated.get()) {
240-
return Mono.empty();
241-
}
242-
byte[] byteArray = new byte[byteBuffer.remaining()];
243-
if (byteBuffer.hasArray()) {
244-
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
245-
} else {
246-
byteBuffer.mark();
247-
byteBuffer.get(byteArray);
248-
byteBuffer.reset();
249-
}
250-
Binary data = new Binary(byteArray);
251-
lengthInBytes.addAndGet(data.length());
205+
return new ResizingByteBufferFlux(source, chunkSizeBytes)
206+
.takeUntilOther(createMonoTimer(timeout))
207+
.index()
208+
.flatMap(indexAndBuffer -> {
209+
if (terminated.get()) {
210+
return Mono.empty();
211+
}
212+
Long index = indexAndBuffer.getT1();
213+
ByteBuffer byteBuffer = indexAndBuffer.getT2();
214+
byte[] byteArray = new byte[byteBuffer.remaining()];
215+
if (byteBuffer.hasArray()) {
216+
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
217+
} else {
218+
byteBuffer.mark();
219+
byteBuffer.get(byteArray);
220+
byteBuffer.reset();
221+
}
222+
Binary data = new Binary(byteArray);
252223

253-
Document chunkDocument = new Document("files_id", fileId)
254-
.append("n", chunkIndex.getAndIncrement())
255-
.append("data", data);
224+
Document chunkDocument = new Document("files_id", fileId)
225+
.append("n", index.intValue())
226+
.append("data", data);
256227

257-
if (clientSession == null) {
258-
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
259-
.insertOne(chunkDocument)
260-
.contextWrite(sink.contextView());
261-
} else {
262-
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
263-
.insertOne(clientSession, chunkDocument)
264-
.contextWrite(sink.contextView());
265-
}
266-
})
267-
.subscribe(null, sink::error, () -> sink.success(lengthInBytes.get()));
268-
});
228+
Publisher<InsertOneResult> insertOnePublisher = clientSession == null
229+
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument)
230+
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
231+
.insertOne(clientSession, chunkDocument);
232+
233+
return Mono.from(insertOnePublisher).thenReturn(data.length());
234+
})
235+
.reduce(0L, Long::sum);
269236
}
270237

271238
/**

0 commit comments

Comments
 (0)