File tree Expand file tree Collapse file tree 1 file changed +7
-11
lines changed
driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal Expand file tree Collapse file tree 1 file changed +7
-11
lines changed Original file line number Diff line number Diff line change @@ -123,21 +123,17 @@ public TimeoutMode getTimeoutMode() {
123
123
124
124
public Publisher <T > first () {
125
125
return batchCursor (this ::asAsyncFirstReadOperation )
126
- .flatMap (batchCursor -> Mono . create ( sink -> {
126
+ .flatMap (batchCursor -> {
127
127
batchCursor .setBatchSize (1 );
128
- Mono .from (batchCursor .next ())
128
+ return Mono .from (batchCursor .next ())
129
129
.doOnTerminate (batchCursor ::close )
130
- .doOnError (sink ::error )
131
- .doOnSuccess (results -> {
130
+ .flatMap (results -> {
132
131
if (results == null || results .isEmpty ()) {
133
- sink .success ();
134
- } else {
135
- sink .success (results .get (0 ));
132
+ return Mono .empty ();
136
133
}
137
- })
138
- .contextWrite (sink .contextView ())
139
- .subscribe ();
140
- }));
134
+ return Mono .fromCallable (() -> results .get (0 ));
135
+ });
136
+ });
141
137
}
142
138
143
139
@ Override
You can’t perform that action at this time.
0 commit comments