Skip to content

Commit 40e02a5

Browse files
Oleh DokukaOlegDokuka
Oleh Dokuka
authored andcommitted
introduces .connect method
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 26abe4b commit 40e02a5

File tree

7 files changed

+208
-0
lines changed

7 files changed

+208
-0
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/ReconnectMonoStressTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ void subscribe() {
543543
reconnectMono.subscribe(stressSubscriber);
544544
}
545545

546+
@Actor
547+
void connect() {
548+
reconnectMono.resolvingInner.connect();
549+
}
550+
546551
@Arbiter
547552
public void arbiter(IIIIII_Result r) {
548553
r.r1 = stressSubscription.subscribes;

rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@
7777
*/
7878
public interface RSocketClient extends Closeable {
7979

80+
/**
81+
* Tries to connect to remote rsocket endpoint if not yet connected. This method is a
82+
* shortcut to {@code RSocketClient#source().subscribe()}
83+
*
84+
* @return {@code true} if successfully activated connection process
85+
*/
86+
default boolean connect() {
87+
throw new NotImplementedException();
88+
}
89+
8090
default Mono<Void> onClose() {
8191
return Mono.error(new NotImplementedException());
8292
}

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public RSocket rsocket() {
4141
return rsocket;
4242
}
4343

44+
@Override
45+
public boolean connect() {
46+
throw new UnsupportedOperationException(
47+
"Serverside RSocketClientAdapter can not proactively open a connection");
48+
}
49+
4450
@Override
4551
public Mono<RSocket> source() {
4652
return Mono.just(rsocket);

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,31 @@ protected void doOnDispose() {
331331
// no ops
332332
}
333333

334+
public final boolean connect() {
335+
for (; ; ) {
336+
final BiConsumer<T, Throwable>[] a = this.subscribers;
337+
338+
if (a == TERMINATED) {
339+
340+
return false;
341+
}
342+
343+
if (a == READY) {
344+
return false;
345+
}
346+
347+
if (a != EMPTY_UNSUBSCRIBED) {
348+
// do nothing if already started
349+
return false;
350+
}
351+
352+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
353+
this.doSubscribe();
354+
return true;
355+
}
356+
}
357+
}
358+
334359
final int add(BiConsumer<T, Throwable> ps) {
335360
for (; ; ) {
336361
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public Mono<Void> onClose() {
4545
return rSocketPool.onClose();
4646
}
4747

48+
@Override
49+
public boolean connect() {
50+
return rSocketPool.connect();
51+
}
52+
4853
/** Return {@code Mono} that selects an RSocket from the underlying pool. */
4954
@Override
5055
public Mono<RSocket> source() {

rsocket-core/src/main/java/io/rsocket/loadbalance/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,30 @@ protected void doOnDispose() {
327327
// no ops
328328
}
329329

330+
public final boolean connect() {
331+
for (; ; ) {
332+
final BiConsumer<T, Throwable>[] a = this.subscribers;
333+
334+
if (a == TERMINATED) {
335+
return false;
336+
}
337+
338+
if (a == READY) {
339+
return false;
340+
}
341+
342+
if (a != EMPTY_UNSUBSCRIBED) {
343+
// do nothing if already started
344+
return false;
345+
}
346+
347+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
348+
this.doSubscribe();
349+
return true;
350+
}
351+
}
352+
}
353+
330354
final int add(BiConsumer<T, Throwable> ps) {
331355
for (; ; ) {
332356
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.rsocket.frame.PayloadFrameCodec;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
3030
import io.rsocket.internal.subscriber.AssertSubscriber;
31+
import io.rsocket.test.util.TestDuplexConnection;
3132
import io.rsocket.util.ByteBufPayload;
3233
import io.rsocket.util.RSocketProxy;
3334
import java.time.Duration;
@@ -495,6 +496,107 @@ public Mono<Void> onClose() {
495496
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
496497
}
497498

499+
@Test
500+
public void shouldResolveOnStartSource() {
501+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
502+
Assertions.assertThat(rule.client.connect()).isTrue();
503+
rule.client.source().subscribe(assertSubscriber);
504+
rule.delayer.run();
505+
assertSubscriber.assertTerminated().assertValueCount(1);
506+
507+
rule.client.dispose();
508+
509+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
510+
511+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
512+
513+
rule.client.onClose().subscribe(assertSubscriber1);
514+
515+
assertSubscriber1.assertTerminated().assertComplete();
516+
517+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
518+
}
519+
520+
@Test
521+
public void shouldNotStartTwiceSubscriptionToSource() {
522+
Assertions.assertThat(rule.client.connect()).isTrue();
523+
Assertions.assertThat(rule.client.connect()).isFalse();
524+
rule.delayer.run();
525+
526+
rule.client.dispose();
527+
528+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
529+
530+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
531+
532+
rule.client.onClose().subscribe(assertSubscriber1);
533+
534+
assertSubscriber1.assertTerminated().assertComplete();
535+
536+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
537+
}
538+
539+
@Test
540+
public void shouldNotStartSubscriptionToSourceIfSourceWasSubscribedDifferently() {
541+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
542+
rule.client.source().subscribe(assertSubscriber);
543+
Assertions.assertThat(rule.client.connect()).isFalse();
544+
rule.delayer.run();
545+
assertSubscriber.assertTerminated().assertValueCount(1);
546+
547+
rule.client.dispose();
548+
549+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
550+
551+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
552+
553+
rule.client.onClose().subscribe(assertSubscriber1);
554+
555+
assertSubscriber1.assertTerminated().assertComplete();
556+
557+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
558+
}
559+
560+
@Test
561+
public void shouldBeRestartedIfSourceWasClosed() {
562+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
563+
AssertSubscriber<Void> terminateSubscriber = AssertSubscriber.create();
564+
565+
Assertions.assertThat(rule.client.connect()).isTrue();
566+
rule.client.source().subscribe(assertSubscriber);
567+
rule.client.onClose().subscribe(terminateSubscriber);
568+
569+
rule.delayer.run();
570+
571+
assertSubscriber.assertTerminated().assertValueCount(1);
572+
573+
rule.socket.dispose();
574+
575+
terminateSubscriber.assertNotTerminated();
576+
Assertions.assertThat(rule.client.isDisposed()).isFalse();
577+
578+
rule.connection = new TestDuplexConnection(rule.allocator);
579+
rule.socket = rule.newRSocket();
580+
rule.producer = Sinks.one();
581+
582+
AssertSubscriber<RSocket> assertSubscriber2 = AssertSubscriber.create();
583+
584+
Assertions.assertThat(rule.client.connect()).isTrue();
585+
rule.client.source().subscribe(assertSubscriber2);
586+
587+
rule.delayer.run();
588+
589+
assertSubscriber2.assertTerminated().assertValueCount(1);
590+
591+
rule.client.dispose();
592+
593+
terminateSubscriber.assertTerminated().assertComplete();
594+
595+
Assertions.assertThat(rule.client.connect()).isFalse();
596+
597+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
598+
}
599+
498600
@Test
499601
public void shouldDisposeOriginalSourceIfRacing() {
500602
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
@@ -523,6 +625,37 @@ public void shouldDisposeOriginalSourceIfRacing() {
523625
}
524626
}
525627

628+
@Test
629+
public void shouldStartOriginalSourceOnceIfRacing() {
630+
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
631+
ClientSocketRule rule = new ClientSocketRule();
632+
633+
rule.init();
634+
635+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
636+
637+
RaceTestUtils.race(
638+
() -> rule.client.source().subscribe(assertSubscriber), () -> rule.client.connect());
639+
640+
Assertions.assertThat(rule.producer.currentSubscriberCount()).isOne();
641+
642+
rule.delayer.run();
643+
644+
assertSubscriber.assertTerminated();
645+
646+
rule.client.dispose();
647+
648+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
649+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
650+
651+
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
652+
653+
rule.client.onClose().subscribe(assertSubscriber1);
654+
655+
assertSubscriber1.assertTerminated().assertComplete();
656+
}
657+
}
658+
526659
public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
527660

528661
protected RSocketClient client;

0 commit comments

Comments
 (0)