20
20
import org .reactivestreams .Subscriber ;
21
21
import org .reactivestreams .Subscription ;
22
22
23
- import java .util .concurrent .CountDownLatch ;
23
+ import java .util .NoSuchElementException ;
24
+ import java .util .concurrent .CompletableFuture ;
24
25
import java .util .concurrent .ScheduledExecutorService ;
25
26
import java .util .concurrent .TimeUnit ;
26
27
import java .util .concurrent .TimeoutException ;
27
28
import java .util .concurrent .atomic .AtomicBoolean ;
28
- import java .util .concurrent .atomic .AtomicReference ;
29
29
30
30
@ FunctionalInterface
31
31
public interface ReactiveSocketFactory <T , R extends ReactiveSocket > {
@@ -38,9 +38,7 @@ public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {
38
38
* @return blocks on create the socket
39
39
*/
40
40
default R callAndWait (T t ) {
41
- AtomicReference <R > reference = new AtomicReference <>();
42
- AtomicReference <Throwable > error = new AtomicReference <>();
43
- CountDownLatch latch = new CountDownLatch (1 );
41
+ CompletableFuture <R > future = new CompletableFuture <>();
44
42
45
43
call (t )
46
44
.subscribe (new Subscriber <R >() {
@@ -51,26 +49,21 @@ public void onSubscribe(Subscription s) {
51
49
52
50
@ Override
53
51
public void onNext (R reactiveSocket ) {
54
- reference . set (reactiveSocket );
52
+ future . complete (reactiveSocket );
55
53
}
56
54
57
55
@ Override
58
56
public void onError (Throwable t ) {
59
- error .set (t );
60
- latch .countDown ();
57
+ future .completeExceptionally (t );
61
58
}
62
59
63
60
@ Override
64
61
public void onComplete () {
65
- latch . countDown ( );
62
+ future . completeExceptionally ( new NoSuchElementException ( "Sequence contains no elements" ) );
66
63
}
67
64
});
68
65
69
- if (error .get () != null ) {
70
- throw new RuntimeException (error .get ());
71
- } else {
72
- return reference .get ();
73
- }
66
+ return future .join ();
74
67
}
75
68
76
69
/**
0 commit comments