Skip to content

Commit bce9f74

Browse files
committed
Code review updates
1 parent 30f3de9 commit bce9f74

File tree

3 files changed

+6
-3
lines changed

3 files changed

+6
-3
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public Publisher<T> first() {
106106
.flatMap(batchCursor -> Mono.create(sink -> {
107107
batchCursor.setBatchSize(1);
108108
Mono.from(batchCursor.next())
109-
.contextWrite(sink.contextView())
110109
.doOnTerminate(batchCursor::close)
111110
.doOnError(sink::error)
112111
.doOnSuccess(results -> {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ private void collInfo(final MongoCryptContext cryptContext,
305305
sink.error(new IllegalStateException("Missing database name"));
306306
} else {
307307
collectionInfoRetriever.filter(databaseName, cryptContext.getMongoOperation())
308+
.contextWrite(sink.contextView())
308309
.doOnSuccess(result -> {
309310
if (result != null) {
310311
cryptContext.addMongoOperationResult(result);
@@ -326,6 +327,7 @@ private void mark(final MongoCryptContext cryptContext,
326327
sink.error(wrapInClientException(new IllegalStateException("Missing database name")));
327328
} else {
328329
commandMarker.mark(databaseName, cryptContext.getMongoOperation())
330+
.contextWrite(sink.contextView())
329331
.doOnSuccess(result -> {
330332
cryptContext.addMongoOperationResult(result);
331333
cryptContext.completeMongoOperation();
@@ -340,6 +342,7 @@ private void fetchKeys(final MongoCryptContext cryptContext,
340342
@Nullable final String databaseName,
341343
final MonoSink<RawBsonDocument> sink) {
342344
keyRetriever.find(cryptContext.getMongoOperation())
345+
.contextWrite(sink.contextView())
343346
.doOnSuccess(results -> {
344347
for (BsonDocument result : results) {
345348
cryptContext.addMongoOperationResult(result);
@@ -357,11 +360,13 @@ private void decryptKeys(final MongoCryptContext cryptContext,
357360
MongoKeyDecryptor keyDecryptor = cryptContext.nextKeyDecryptor();
358361
if (keyDecryptor != null) {
359362
keyManagementService.decryptKey(keyDecryptor)
363+
.contextWrite(sink.contextView())
360364
.doOnSuccess(r -> decryptKeys(cryptContext, databaseName, sink))
361365
.doOnError(e -> sink.error(wrapInClientException(e)))
362366
.subscribe();
363367
} else {
364368
Mono.fromRunnable(cryptContext::completeKeyDecryptors)
369+
.contextWrite(sink.contextView())
365370
.doOnSuccess(r -> executeStateMachineWithSink(cryptContext, databaseName, sink))
366371
.doOnError(e -> sink.error(wrapInClientException(e)))
367372
.subscribe();

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public BsonValue getId() {
100100
public void subscribe(final Subscriber<? super Void> s) {
101101
Mono.<Void>create(sink -> {
102102
AtomicBoolean terminated = new AtomicBoolean(false);
103-
sink.onCancel(() -> createCancellationMono(terminated).subscribe());
103+
sink.onCancel(() -> createCancellationMono(terminated).contextWrite(sink.contextView()).subscribe());
104104

105105
Consumer<Throwable> errorHandler = e -> createCancellationMono(terminated)
106106
.contextWrite(sink.contextView())
@@ -163,7 +163,6 @@ private Mono<Void> createCheckAndCreateIndexesMono() {
163163
AtomicBoolean collectionExists = new AtomicBoolean(false);
164164

165165
return Mono.create(sink -> Mono.from(findPublisher.projection(PROJECTION).first())
166-
.contextWrite(sink.contextView())
167166
.subscribe(
168167
d -> collectionExists.set(true),
169168
sink::error,

0 commit comments

Comments
 (0)