Skip to content

Commit 87a1a2e

Browse files
vbabaninjyemin
authored andcommitted
Add BatchCursor interceptor.
1 parent ed8f923 commit 87a1a2e

File tree

2 files changed

+71
-7
lines changed

2 files changed

+71
-7
lines changed

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,24 @@
2121
import com.mongodb.ServerCursor;
2222
import com.mongodb.client.MongoCursor;
2323
import com.mongodb.lang.Nullable;
24+
import com.mongodb.reactivestreams.client.internal.BatchCursor;
2425
import org.reactivestreams.Publisher;
2526
import org.reactivestreams.Subscriber;
2627
import org.reactivestreams.Subscription;
28+
import reactor.core.CoreSubscriber;
2729
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Hooks;
31+
import reactor.core.publisher.Operators;
32+
import reactor.util.context.Context;
2833

2934
import java.util.NoSuchElementException;
3035
import java.util.concurrent.BlockingDeque;
36+
import java.util.concurrent.CompletableFuture;
3137
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.ExecutionException;
3239
import java.util.concurrent.LinkedBlockingDeque;
3340
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.TimeoutException;
3442

3543
import static com.mongodb.ClusterFixture.TIMEOUT;
3644
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
@@ -41,6 +49,7 @@
4149
class SyncMongoCursor<T> implements MongoCursor<T> {
4250
private static final Object COMPLETED = new Object();
4351
private final BlockingDeque<Object> results = new LinkedBlockingDeque<>();
52+
private final CompletableFuture<BatchCursor> batchCursorCompletableFuture = new CompletableFuture<>();
4453
private final Integer batchSize;
4554
private int countToBatchSize;
4655
private Subscription subscription;
@@ -50,7 +59,11 @@ class SyncMongoCursor<T> implements MongoCursor<T> {
5059

5160
SyncMongoCursor(final Publisher<T> publisher, @Nullable final Integer batchSize) {
5261
this.batchSize = batchSize;
53-
CountDownLatch latch = new CountDownLatch(1);
62+
CountDownLatch subscriptionLatch = new CountDownLatch(1);
63+
Hooks.onEachOperator(Operators.lift((sc, sub) ->
64+
new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture
65+
)));
66+
5467
//noinspection ReactiveStreamsSubscriberImplementation
5568
Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber<T>() {
5669
@Override
@@ -61,7 +74,7 @@ public void onSubscribe(final Subscription s) {
6174
} else {
6275
subscription.request(batchSize);
6376
}
64-
latch.countDown();
77+
subscriptionLatch.countDown();
6578
}
6679

6780
@Override
@@ -80,12 +93,20 @@ public void onComplete() {
8093
}
8194
});
8295
try {
83-
if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) {
96+
if (!subscriptionLatch.await(TIMEOUT, TimeUnit.SECONDS)) {
8497
throw new MongoTimeoutException("Timeout waiting for subscription");
8598
}
99+
batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
100+
Hooks.resetOnEachOperator();
86101
sleep(getSleepAfterCursorOpen());
87102
} catch (InterruptedException e) {
88103
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e);
104+
} catch (ExecutionException | TimeoutException e) {
105+
Throwable cause = e.getCause();
106+
if (cause instanceof RuntimeException) {
107+
throw (RuntimeException) cause;
108+
}
109+
throw new RuntimeException(e);
89110
}
90111
}
91112

@@ -181,4 +202,50 @@ private RuntimeException translateError(final Throwable throwable) {
181202
}
182203
return new RuntimeException(throwable);
183204
}
205+
206+
207+
static class BatchCursorInterceptSubscriber implements CoreSubscriber {
208+
209+
private final CoreSubscriber sub;
210+
private final CompletableFuture<BatchCursor> batchCursorCompletableFuture;
211+
212+
213+
BatchCursorInterceptSubscriber(final CoreSubscriber sub,
214+
final CompletableFuture<BatchCursor> batchCursorCompletableFuture) {
215+
this.sub = sub;
216+
this.batchCursorCompletableFuture = batchCursorCompletableFuture;
217+
}
218+
219+
@Override
220+
public Context currentContext() {
221+
return sub.currentContext();
222+
}
223+
224+
@Override
225+
public void onSubscribe(final Subscription s) {
226+
sub.onSubscribe(s);
227+
}
228+
229+
@Override
230+
public void onNext(final Object o) {
231+
if (o instanceof BatchCursor) {
232+
// Interception of a cursor means that it has been created at this point.
233+
batchCursorCompletableFuture.complete((BatchCursor) o);
234+
}
235+
sub.onNext(o);
236+
}
237+
238+
@Override
239+
public void onError(final Throwable t) {
240+
if (!batchCursorCompletableFuture.isDone()) { // Cursor has not been created yet but an error occurred.
241+
batchCursorCompletableFuture.completeExceptionally(t);
242+
}
243+
sub.onError(t);
244+
}
245+
246+
@Override
247+
public void onComplete() {
248+
sub.onComplete();
249+
}
250+
}
184251
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
3636

3737
private static final List<String> ERROR_REQUIRED_FROM_CHANGE_STREAM_INITIALIZATION_TESTS =
3838
Arrays.asList(
39-
"Test with document comment - pre 4.4",
40-
"Change Stream should error when an invalid aggregation stage is passed in",
41-
"The watch helper must not throw a custom exception when executed against a single server topology, "
42-
+ "but instead depend on a server error"
39+
"Test with document comment - pre 4.4"
4340
);
4441

4542
private static final List<String> EVENT_SENSITIVE_TESTS =

0 commit comments

Comments
 (0)