Skip to content

Commit d1875da

Browse files
Oleh Dokukarstoyanchev
authored andcommitted
introduces .connect method
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Co-authored-by: Rossen Stoyanchev <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 26abe4b commit d1875da

File tree

7 files changed

+207
-0
lines changed

7 files changed

+207
-0
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/ReconnectMonoStressTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ void subscribe() {
543543
reconnectMono.subscribe(stressSubscriber);
544544
}
545545

546+
@Actor
547+
void connect() {
548+
reconnectMono.resolvingInner.connect();
549+
}
550+
546551
@Arbiter
547552
public void arbiter(IIIIII_Result r) {
548553
r.r1 = stressSubscription.subscribes;

rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@
7777
*/
7878
public interface RSocketClient extends Closeable {
7979

80+
/**
81+
* Connect to the remote rsocket endpoint, if not yet connected. This method is a shortcut for
82+
* {@code RSocketClient#source().subscribe()}.
83+
*
84+
* @return {@code true} if an attempt to connect was triggered or if already connected, or {@code
85+
* false} if the client is terminated.
86+
*/
87+
default boolean connect() {
88+
throw new NotImplementedException();
89+
}
90+
8091
default Mono<Void> onClose() {
8192
return Mono.error(new NotImplementedException());
8293
}

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public RSocket rsocket() {
4141
return rsocket;
4242
}
4343

44+
@Override
45+
public boolean connect() {
46+
throw new UnsupportedOperationException("Connect does not apply to a server side RSocket");
47+
}
48+
4449
@Override
4550
public Mono<RSocket> source() {
4651
return Mono.just(rsocket);

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,30 @@ protected void doOnDispose() {
331331
// no ops
332332
}
333333

334+
public final boolean connect() {
335+
for (; ; ) {
336+
final BiConsumer<T, Throwable>[] a = this.subscribers;
337+
338+
if (a == TERMINATED) {
339+
return false;
340+
}
341+
342+
if (a == READY) {
343+
return true;
344+
}
345+
346+
if (a != EMPTY_UNSUBSCRIBED) {
347+
// do nothing if already started
348+
return true;
349+
}
350+
351+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
352+
this.doSubscribe();
353+
return true;
354+
}
355+
}
356+
}
357+
334358
final int add(BiConsumer<T, Throwable> ps) {
335359
for (; ; ) {
336360
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public Mono<Void> onClose() {
4545
return rSocketPool.onClose();
4646
}
4747

48+
@Override
49+
public boolean connect() {
50+
return rSocketPool.connect();
51+
}
52+
4853
/** Return {@code Mono} that selects an RSocket from the underlying pool. */
4954
@Override
5055
public Mono<RSocket> source() {

rsocket-core/src/main/java/io/rsocket/loadbalance/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,30 @@ protected void doOnDispose() {
327327
// no ops
328328
}
329329

330+
public final boolean connect() {
331+
for (; ; ) {
332+
final BiConsumer<T, Throwable>[] a = this.subscribers;
333+
334+
if (a == TERMINATED) {
335+
return false;
336+
}
337+
338+
if (a == READY) {
339+
return true;
340+
}
341+
342+
if (a != EMPTY_UNSUBSCRIBED) {
343+
// do nothing if already started
344+
return true;
345+
}
346+
347+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
348+
this.doSubscribe();
349+
return true;
350+
}
351+
}
352+
}
353+
330354
final int add(BiConsumer<T, Throwable> ps) {
331355
for (; ; ) {
332356
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.rsocket.frame.PayloadFrameCodec;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
3030
import io.rsocket.internal.subscriber.AssertSubscriber;
31+
import io.rsocket.test.util.TestDuplexConnection;
3132
import io.rsocket.util.ByteBufPayload;
3233
import io.rsocket.util.RSocketProxy;
3334
import java.time.Duration;
@@ -495,6 +496,107 @@ public Mono<Void> onClose() {
495496
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
496497
}
497498

499+
@Test
500+
public void shouldResolveOnStartSource() {
501+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
502+
Assertions.assertThat(rule.client.connect()).isTrue();
503+
rule.client.source().subscribe(assertSubscriber);
504+
rule.delayer.run();
505+
assertSubscriber.assertTerminated().assertValueCount(1);
506+
507+
rule.client.dispose();
508+
509+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
510+
511+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
512+
513+
rule.client.onClose().subscribe(assertSubscriber1);
514+
515+
assertSubscriber1.assertTerminated().assertComplete();
516+
517+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
518+
}
519+
520+
@Test
521+
public void shouldNotStartTwiceSubscriptionToSource() {
522+
Assertions.assertThat(rule.client.connect()).isTrue();
523+
Assertions.assertThat(rule.client.connect()).isFalse();
524+
rule.delayer.run();
525+
526+
rule.client.dispose();
527+
528+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
529+
530+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
531+
532+
rule.client.onClose().subscribe(assertSubscriber1);
533+
534+
assertSubscriber1.assertTerminated().assertComplete();
535+
536+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
537+
}
538+
539+
@Test
540+
public void shouldNotStartSubscriptionToSourceIfSourceWasSubscribedDifferently() {
541+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
542+
rule.client.source().subscribe(assertSubscriber);
543+
Assertions.assertThat(rule.client.connect()).isFalse();
544+
rule.delayer.run();
545+
assertSubscriber.assertTerminated().assertValueCount(1);
546+
547+
rule.client.dispose();
548+
549+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
550+
551+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
552+
553+
rule.client.onClose().subscribe(assertSubscriber1);
554+
555+
assertSubscriber1.assertTerminated().assertComplete();
556+
557+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
558+
}
559+
560+
@Test
561+
public void shouldBeRestartedIfSourceWasClosed() {
562+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
563+
AssertSubscriber<Void> terminateSubscriber = AssertSubscriber.create();
564+
565+
Assertions.assertThat(rule.client.connect()).isTrue();
566+
rule.client.source().subscribe(assertSubscriber);
567+
rule.client.onClose().subscribe(terminateSubscriber);
568+
569+
rule.delayer.run();
570+
571+
assertSubscriber.assertTerminated().assertValueCount(1);
572+
573+
rule.socket.dispose();
574+
575+
terminateSubscriber.assertNotTerminated();
576+
Assertions.assertThat(rule.client.isDisposed()).isFalse();
577+
578+
rule.connection = new TestDuplexConnection(rule.allocator);
579+
rule.socket = rule.newRSocket();
580+
rule.producer = Sinks.one();
581+
582+
AssertSubscriber<RSocket> assertSubscriber2 = AssertSubscriber.create();
583+
584+
Assertions.assertThat(rule.client.connect()).isTrue();
585+
rule.client.source().subscribe(assertSubscriber2);
586+
587+
rule.delayer.run();
588+
589+
assertSubscriber2.assertTerminated().assertValueCount(1);
590+
591+
rule.client.dispose();
592+
593+
terminateSubscriber.assertTerminated().assertComplete();
594+
595+
Assertions.assertThat(rule.client.connect()).isFalse();
596+
597+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
598+
}
599+
498600
@Test
499601
public void shouldDisposeOriginalSourceIfRacing() {
500602
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
@@ -523,6 +625,37 @@ public void shouldDisposeOriginalSourceIfRacing() {
523625
}
524626
}
525627

628+
@Test
629+
public void shouldStartOriginalSourceOnceIfRacing() {
630+
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
631+
ClientSocketRule rule = new ClientSocketRule();
632+
633+
rule.init();
634+
635+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
636+
637+
RaceTestUtils.race(
638+
() -> rule.client.source().subscribe(assertSubscriber), () -> rule.client.connect());
639+
640+
Assertions.assertThat(rule.producer.currentSubscriberCount()).isOne();
641+
642+
rule.delayer.run();
643+
644+
assertSubscriber.assertTerminated();
645+
646+
rule.client.dispose();
647+
648+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
649+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
650+
651+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
652+
653+
rule.client.onClose().subscribe(assertSubscriber1);
654+
655+
assertSubscriber1.assertTerminated().assertComplete();
656+
}
657+
}
658+
526659
public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
527660

528661
protected RSocketClient client;

0 commit comments

Comments
 (0)