Skip to content

Commit 289b1e7

Browse files
committed
fixes interface naming
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent f6c05e7 commit 289b1e7

File tree

4 files changed

+14
-13
lines changed

4 files changed

+14
-13
lines changed

rsocket-core/src/main/java/io/rsocket/Invalidate.java renamed to rsocket-core/src/main/java/io/rsocket/Invalidatable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.rsocket;
1717

18-
interface Invalidate {
18+
interface Invalidatable {
1919

20-
void expire();
20+
void invalidate();
2121
}

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.rsocket.util.MultiSubscriberRSocket;
4545
import java.time.Duration;
4646
import java.util.Objects;
47+
import java.util.function.BiConsumer;
4748
import java.util.function.Consumer;
4849
import java.util.function.Function;
4950
import java.util.function.Supplier;
@@ -98,6 +99,8 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9899
public static class ClientRSocketFactory implements ClientTransportAcceptor {
99100
private static final String CLIENT_TAG = "client";
100101

102+
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
103+
(r, i) -> r.onClose().subscribe(null, null, i::invalidate);
101104
private static final Function<Flux<Throwable>, ? extends Publisher<?>> FAIL_WHEN_FACTORY =
102105
f -> f.concatMap(Mono::error);
103106

@@ -528,9 +531,7 @@ public Mono<RSocket> start() {
528531
source -> {
529532
if (reconnectEnabled) {
530533
return new ReconnectMono<>(
531-
source.retryWhen(whenFactory),
532-
Disposable::dispose,
533-
(r, i) -> r.onClose().subscribe(null, null, i::expire));
534+
source.retryWhen(whenFactory), Disposable::dispose, INVALIDATE_FUNCTION);
534535
} else {
535536
return source;
536537
}

rsocket-core/src/main/java/io/rsocket/ReconnectMono.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
import reactor.util.annotation.Nullable;
3434
import reactor.util.context.Context;
3535

36-
final class ReconnectMono<T> extends Mono<T> implements Invalidate, Disposable, Scannable {
36+
final class ReconnectMono<T> extends Mono<T> implements Invalidatable, Disposable, Scannable {
3737

3838
final Mono<T> source;
39-
final BiConsumer<? super T, Invalidate> onValueReceived;
39+
final BiConsumer<? super T, Invalidatable> onValueReceived;
4040
final Consumer<? super T> onValueExpired;
4141
final ReconnectMainSubscriber<? super T> mainSubscriber;
4242

@@ -75,7 +75,7 @@ final class ReconnectMono<T> extends Mono<T> implements Invalidate, Disposable,
7575
ReconnectMono(
7676
Mono<T> source,
7777
Consumer<? super T> onValueExpired,
78-
BiConsumer<? super T, Invalidate> onValueReceived) {
78+
BiConsumer<? super T, Invalidatable> onValueReceived) {
7979
this.source = source;
8080
this.onValueExpired = onValueExpired;
8181
this.onValueReceived = onValueReceived;
@@ -275,7 +275,7 @@ void doFinally() {
275275

276276
// Check RSocket is not good
277277
@Override
278-
public void expire() {
278+
public void invalidate() {
279279
if (this.subscribers == TERMINATED) {
280280
return;
281281
}

rsocket-core/src/test/java/io/rsocket/ReconnectMonoTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
public class ReconnectMonoTests {
5454

5555
private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
56-
private Queue<Tuple2<Object, Invalidate>> received = new ConcurrentLinkedQueue<>();
56+
private Queue<Tuple2<Object, Invalidatable>> received = new ConcurrentLinkedQueue<>();
5757
private Queue<Object> expired = new ConcurrentLinkedQueue<>();
5858

5959
@Test
@@ -542,7 +542,7 @@ public void shouldNotExpiredIfNotCompleted() {
542542
Assertions.assertThat(received).isEmpty();
543543
Assertions.assertThat(processor.isTerminated()).isFalse();
544544

545-
reconnectMono.expire();
545+
reconnectMono.invalidate();
546546

547547
Assertions.assertThat(expired).isEmpty();
548548
Assertions.assertThat(received).isEmpty();
@@ -736,7 +736,7 @@ public void shouldExpireValueExactlyOnce() {
736736

737737
Assertions.assertThat(expired).isEmpty();
738738
Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono));
739-
RaceTestUtils.race(reconnectMono::expire, reconnectMono::expire);
739+
RaceTestUtils.race(reconnectMono::invalidate, reconnectMono::invalidate);
740740

741741
Assertions.assertThat(expired).hasSize(1).containsOnly("value");
742742
Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono));
@@ -906,7 +906,7 @@ Consumer<? super RetryContext<?>> onRetry() {
906906
return context -> retries.add(context);
907907
}
908908

909-
<T> BiConsumer<T, Invalidate> onValue() {
909+
<T> BiConsumer<T, Invalidatable> onValue() {
910910
return (v, __) -> received.add(Tuples.of(v, __));
911911
}
912912

0 commit comments

Comments
 (0)