Skip to content

Commit b768d7a

Browse files
committed
Add missing file Help Unsafe object
1 parent dfe6987 commit b768d7a

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.reactivesocket.util;
2+
3+
import io.reactivesocket.ReactiveSocket;
4+
import io.reactivesocket.rx.Completable;
5+
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Subscription;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.concurrent.*;
12+
13+
public class Unsafe {
14+
public static ReactiveSocket startAndWait(ReactiveSocket rsc) throws InterruptedException {
15+
CountDownLatch latch = new CountDownLatch(1);
16+
Completable completable = new Completable() {
17+
@Override
18+
public void success() {
19+
latch.countDown();
20+
}
21+
22+
@Override
23+
public void error(Throwable e) {
24+
latch.countDown();
25+
}
26+
};
27+
rsc.start(completable);
28+
latch.await();
29+
// awaitAvailability(rsc);
30+
31+
return rsc;
32+
}
33+
34+
public static ReactiveSocket awaitAvailability(ReactiveSocket rsc) throws InterruptedException {
35+
long waiting = 1L;
36+
while (rsc.availability() == 0.0) {
37+
Thread.sleep(waiting);
38+
waiting = Math.max(waiting * 2, 1000L);
39+
}
40+
return rsc;
41+
}
42+
43+
public static <T> T blockingSingleWait(Publisher<T> publisher, long timeout, TimeUnit unit)
44+
throws InterruptedException, ExecutionException, TimeoutException {
45+
return toSingleFuture(publisher).get(timeout, unit);
46+
}
47+
48+
public static <T> List<T> blockingWait(Publisher<T> publisher, long timeout, TimeUnit unit)
49+
throws InterruptedException, ExecutionException, TimeoutException {
50+
return toFuture(publisher).get(timeout, unit);
51+
}
52+
53+
public static <T> CompletableFuture<T> toSingleFuture(Publisher<T> publisher) {
54+
return toFuture(publisher).thenApply(list -> list.get(0));
55+
}
56+
57+
public static <T> CompletableFuture<List<T>> toFuture(Publisher<T> publisher) {
58+
CompletableFuture<List<T>> future = new CompletableFuture<>();
59+
60+
publisher.subscribe(new Subscriber<T>() {
61+
private List<T> buffer = new ArrayList<T>();
62+
63+
@Override
64+
public void onSubscribe(Subscription s) {
65+
s.request(Long.MAX_VALUE);
66+
}
67+
68+
@Override
69+
public void onNext(T t) {
70+
buffer.add(t);
71+
}
72+
73+
@Override
74+
public void onError(Throwable t) {
75+
future.completeExceptionally(t);
76+
}
77+
78+
@Override
79+
public void onComplete() {
80+
future.complete(buffer);
81+
}
82+
});
83+
84+
return future;
85+
}
86+
}

0 commit comments

Comments
 (0)