Skip to content

Commit 30f3de9

Browse files
committed
Ensure Sink.contextView is propagated
When creating a Mono via Mono.create any nested Monos should have the higher level sinks context view propogated down. JAVA-5345
1 parent 099ec1e commit 30f3de9

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public Publisher<T> first() {
106106
.flatMap(batchCursor -> Mono.create(sink -> {
107107
batchCursor.setBatchSize(1);
108108
Mono.from(batchCursor.next())
109+
.contextWrite(sink.contextView())
109110
.doOnTerminate(batchCursor::close)
110111
.doOnError(sink::error)
111112
.doOnSuccess(results -> {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,21 +103,25 @@ public void subscribe(final Subscriber<? super Void> s) {
103103
sink.onCancel(() -> createCancellationMono(terminated).subscribe());
104104

105105
Consumer<Throwable> errorHandler = e -> createCancellationMono(terminated)
106+
.contextWrite(sink.contextView())
106107
.doOnError(i -> sink.error(e))
107108
.doOnSuccess(i -> sink.error(e))
108109
.subscribe();
109110

110111
Consumer<Long> saveFileDataMono = l -> createSaveFileDataMono(terminated, l)
112+
.contextWrite(sink.contextView())
111113
.doOnError(errorHandler)
112114
.doOnSuccess(i -> sink.success())
113115
.subscribe();
114116

115117
Consumer<Void> saveChunksMono = i -> createSaveChunksMono(terminated)
118+
.contextWrite(sink.contextView())
116119
.doOnError(errorHandler)
117120
.doOnSuccess(saveFileDataMono)
118121
.subscribe();
119122

120123
createCheckAndCreateIndexesMono()
124+
.contextWrite(sink.contextView())
121125
.doOnError(errorHandler)
122126
.doOnSuccess(saveChunksMono)
123127
.subscribe();
@@ -159,6 +163,7 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
159163
AtomicBoolean collectionExists = new AtomicBoolean(false);
160164

161165
return Mono.create(sink -> Mono.from(findPublisher.projection(PROJECTION).first())
166+
.contextWrite(sink.contextView())
162167
.subscribe(
163168
d -> collectionExists.set(true),
164169
sink::error,
@@ -167,6 +172,7 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
167172
sink.success();
168173
} else {
169174
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX)
175+
.contextWrite(sink.contextView())
170176
.doOnError(sink::error)
171177
.doOnSuccess(i -> {
172178
checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX)
@@ -227,6 +233,7 @@ private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated) {
227233
AtomicLong lengthInBytes = new AtomicLong(0);
228234
AtomicInteger chunkIndex = new AtomicInteger(0);
229235
new ResizingByteBufferFlux(source, chunkSizeBytes)
236+
.contextWrite(sink.contextView())
230237
.flatMap((Function<ByteBuffer, Publisher<InsertOneResult>>) byteBuffer -> {
231238
if (terminated.get()) {
232239
return Mono.empty();

0 commit comments

Comments
 (0)