Skip to content

Commit 57dbf6a

Browse files
committed
Pass context to cancellation mono
1 parent 55cdbf7 commit 57dbf6a

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
import org.reactivestreams.Subscriber;
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Mono;
38+
import reactor.core.scheduler.Schedulers;
39+
import reactor.util.context.ContextView;
3840

3941
import java.nio.ByteBuffer;
4042
import java.util.Date;
4143
import java.util.Map;
4244
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.function.Function;
4346

4447
import static com.mongodb.ReadPreference.primary;
4548
import static com.mongodb.assertions.Assertions.notNull;
@@ -103,7 +106,7 @@ public BsonValue getId() {
103106

104107
@Override
105108
public void subscribe(final Subscriber<? super Void> s) {
106-
Mono.defer(() -> {
109+
Mono.deferContextual(ctx -> {
107110
AtomicBoolean terminated = new AtomicBoolean(false);
108111
Timeout timeout = TimeoutContext.startTimeout(timeoutMs);
109112
return createCheckAndCreateIndexesMono(timeout)
@@ -117,7 +120,7 @@ public void subscribe(final Subscriber<? super Void> s) {
117120
return originalError;
118121
})
119122
.then(Mono.error(originalError)))
120-
.doOnCancel(() -> createCancellationMono(terminated, timeout).subscribe())
123+
.doOnCancel(() -> createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe())
121124
.then();
122125
}).subscribe(s);
123126
}

0 commit comments

Comments
 (0)