Skip to content

Commit 01f5458

Browse files
Oleh Dokukarstoyanchev
authored andcommitted
introduces .connect method
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Co-authored-by: Rossen Stoyanchev <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent ac96b8e commit 01f5458

File tree

7 files changed

+188
-0
lines changed

7 files changed

+188
-0
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/ReconnectMonoStressTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ void subscribe() {
543543
reconnectMono.subscribe(stressSubscriber);
544544
}
545545

546+
@Actor
547+
void connect() {
548+
reconnectMono.resolvingInner.connect();
549+
}
550+
546551
@Arbiter
547552
public void arbiter(IIIIII_Result r) {
548553
r.r1 = stressSubscription.subscribes;

rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@
7777
*/
7878
public interface RSocketClient extends Closeable {
7979

80+
/**
81+
* Connect to the remote rsocket endpoint, if not yet connected. This method is a shortcut for
82+
* {@code RSocketClient#source().subscribe()}.
83+
*
84+
* @return {@code true} if an attempt to connect was triggered or if already connected, or {@code
85+
* false} if the client is terminated.
86+
*/
87+
default boolean connect() {
88+
throw new NotImplementedException();
89+
}
90+
8091
default Mono<Void> onClose() {
8192
return Mono.error(new NotImplementedException());
8293
}

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public RSocket rsocket() {
4141
return rsocket;
4242
}
4343

44+
@Override
45+
public boolean connect() {
46+
throw new UnsupportedOperationException("Connect does not apply to a server side RSocket");
47+
}
48+
4449
@Override
4550
public Mono<RSocket> source() {
4651
return Mono.just(rsocket);

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,30 @@ protected void doOnDispose() {
331331
// no ops
332332
}
333333

334+
public final boolean connect() {
335+
for (; ; ) {
336+
final BiConsumer<T, Throwable>[] a = this.subscribers;
337+
338+
if (a == TERMINATED) {
339+
return false;
340+
}
341+
342+
if (a == READY) {
343+
return true;
344+
}
345+
346+
if (a != EMPTY_UNSUBSCRIBED) {
347+
// do nothing if already started
348+
return true;
349+
}
350+
351+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
352+
this.doSubscribe();
353+
return true;
354+
}
355+
}
356+
}
357+
334358
final int add(BiConsumer<T, Throwable> ps) {
335359
for (; ; ) {
336360
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public Mono<Void> onClose() {
4545
return rSocketPool.onClose();
4646
}
4747

48+
@Override
49+
public boolean connect() {
50+
return rSocketPool.connect();
51+
}
52+
4853
/** Return {@code Mono} that selects an RSocket from the underlying pool. */
4954
@Override
5055
public Mono<RSocket> source() {

rsocket-core/src/main/java/io/rsocket/loadbalance/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,30 @@ protected void doOnDispose() {
327327
// no ops
328328
}
329329

330+
public final boolean connect() {
331+
for (; ; ) {
332+
final BiConsumer<T, Throwable>[] a = this.subscribers;
333+
334+
if (a == TERMINATED) {
335+
return false;
336+
}
337+
338+
if (a == READY) {
339+
return true;
340+
}
341+
342+
if (a != EMPTY_UNSUBSCRIBED) {
343+
// do nothing if already started
344+
return true;
345+
}
346+
347+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
348+
this.doSubscribe();
349+
return true;
350+
}
351+
}
352+
}
353+
330354
final int add(BiConsumer<T, Throwable> ps) {
331355
for (; ; ) {
332356
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.rsocket.frame.PayloadFrameCodec;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
3030
import io.rsocket.internal.subscriber.AssertSubscriber;
31+
import io.rsocket.test.util.TestDuplexConnection;
3132
import io.rsocket.util.ByteBufPayload;
3233
import io.rsocket.util.RSocketProxy;
3334
import java.time.Duration;
@@ -495,6 +496,88 @@ public Mono<Void> onClose() {
495496
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
496497
}
497498

499+
@Test
500+
public void shouldResolveOnStartSource() {
501+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
502+
Assertions.assertThat(rule.client.connect()).isTrue();
503+
rule.client.source().subscribe(assertSubscriber);
504+
rule.delayer.run();
505+
assertSubscriber.assertTerminated().assertValueCount(1);
506+
507+
rule.client.dispose();
508+
509+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
510+
511+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
512+
513+
rule.client.onClose().subscribe(assertSubscriber1);
514+
515+
assertSubscriber1.assertTerminated().assertComplete();
516+
517+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
518+
}
519+
520+
@Test
521+
public void shouldNotStartIfAlreadyDisposed() {
522+
Assertions.assertThat(rule.client.connect()).isTrue();
523+
Assertions.assertThat(rule.client.connect()).isTrue();
524+
rule.delayer.run();
525+
526+
rule.client.dispose();
527+
528+
Assertions.assertThat(rule.client.connect()).isFalse();
529+
530+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
531+
532+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
533+
534+
rule.client.onClose().subscribe(assertSubscriber1);
535+
536+
assertSubscriber1.assertTerminated().assertComplete();
537+
538+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
539+
}
540+
541+
@Test
542+
public void shouldBeRestartedIfSourceWasClosed() {
543+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
544+
AssertSubscriber<Void> terminateSubscriber = AssertSubscriber.create();
545+
546+
Assertions.assertThat(rule.client.connect()).isTrue();
547+
rule.client.source().subscribe(assertSubscriber);
548+
rule.client.onClose().subscribe(terminateSubscriber);
549+
550+
rule.delayer.run();
551+
552+
assertSubscriber.assertTerminated().assertValueCount(1);
553+
554+
rule.socket.dispose();
555+
556+
terminateSubscriber.assertNotTerminated();
557+
Assertions.assertThat(rule.client.isDisposed()).isFalse();
558+
559+
rule.connection = new TestDuplexConnection(rule.allocator);
560+
rule.socket = rule.newRSocket();
561+
rule.producer = Sinks.one();
562+
563+
AssertSubscriber<RSocket> assertSubscriber2 = AssertSubscriber.create();
564+
565+
Assertions.assertThat(rule.client.connect()).isTrue();
566+
rule.client.source().subscribe(assertSubscriber2);
567+
568+
rule.delayer.run();
569+
570+
assertSubscriber2.assertTerminated().assertValueCount(1);
571+
572+
rule.client.dispose();
573+
574+
terminateSubscriber.assertTerminated().assertComplete();
575+
576+
Assertions.assertThat(rule.client.connect()).isFalse();
577+
578+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
579+
}
580+
498581
@Test
499582
public void shouldDisposeOriginalSourceIfRacing() {
500583
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
@@ -523,6 +606,37 @@ public void shouldDisposeOriginalSourceIfRacing() {
523606
}
524607
}
525608

609+
@Test
610+
public void shouldStartOriginalSourceOnceIfRacing() {
611+
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
612+
ClientSocketRule rule = new ClientSocketRule();
613+
614+
rule.init();
615+
616+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
617+
618+
RaceTestUtils.race(
619+
() -> rule.client.source().subscribe(assertSubscriber), () -> rule.client.connect());
620+
621+
Assertions.assertThat(rule.producer.currentSubscriberCount()).isOne();
622+
623+
rule.delayer.run();
624+
625+
assertSubscriber.assertTerminated();
626+
627+
rule.client.dispose();
628+
629+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
630+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
631+
632+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
633+
634+
rule.client.onClose().subscribe(assertSubscriber1);
635+
636+
assertSubscriber1.assertTerminated().assertComplete();
637+
}
638+
}
639+
526640
public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
527641

528642
protected RSocketClient client;

0 commit comments

Comments
 (0)