Skip to content

Commit bdbae86

Browse files
committed
rename ResumeAwareConnection to ResumePositionsConnection
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 4a627d0 commit bdbae86

File tree

8 files changed

+30
-30
lines changed

8 files changed

+30
-30
lines changed

rsocket-core/src/main/java/io/rsocket/internal/ClientServerConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package io.rsocket.internal;
1818

1919
import io.rsocket.DuplexConnection;
20-
import io.rsocket.resume.ResumeAwareConnection;
20+
import io.rsocket.resume.ResumePositionsConnection;
2121
import io.rsocket.resume.ResumeStateHolder;
2222
import io.rsocket.util.DuplexConnectionProxy;
2323

24-
class ClientServerConnection extends DuplexConnectionProxy implements ResumeAwareConnection {
24+
class ClientServerConnection extends DuplexConnectionProxy implements ResumePositionsConnection {
2525

2626
private final DuplexConnection resumeAware;
2727

@@ -32,8 +32,8 @@ public ClientServerConnection(DuplexConnection delegate, DuplexConnection resume
3232

3333
@Override
3434
public void acceptResumeState(ResumeStateHolder resumeStateHolder) {
35-
if (resumeAware instanceof ResumeAwareConnection) {
36-
((ResumeAwareConnection) resumeAware).acceptResumeState(resumeStateHolder);
35+
if (resumeAware instanceof ResumePositionsConnection) {
36+
((ResumePositionsConnection) resumeAware).acceptResumeState(resumeStateHolder);
3737
}
3838
}
3939
}

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.rsocket.frame.FrameUtil;
2424
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2525
import io.rsocket.plugins.PluginRegistry;
26-
import io.rsocket.resume.ResumeAwareConnection;
26+
import io.rsocket.resume.ResumePositionsConnection;
2727
import org.reactivestreams.Publisher;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class ClientServerInputMultiplexer implements Closeable {
5252
private final DuplexConnection serverConnection;
5353
private final DuplexConnection clientConnection;
5454
private final DuplexConnection source;
55-
private final ResumeAwareConnection clientServerConnection;
55+
private final ResumePositionsConnection clientServerConnection;
5656

5757
public ClientServerInputMultiplexer(DuplexConnection source) {
5858
this(source, emptyPluginRegistry);
@@ -115,7 +115,7 @@ public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plug
115115
});
116116
}
117117

118-
public ResumeAwareConnection asClientServerConnection() {
118+
public ResumePositionsConnection asClientServerConnection() {
119119
return clientServerConnection;
120120
}
121121

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.rsocket.frame.FrameHeaderFlyweight;
2424
import io.rsocket.frame.FrameType;
2525
import io.rsocket.internal.KeepAliveData;
26-
import io.rsocket.resume.ResumeAwareConnection;
26+
import io.rsocket.resume.ResumePositionsConnection;
2727
import io.rsocket.resume.ResumeStateHolder;
2828
import io.rsocket.util.DuplexConnectionProxy;
2929
import io.rsocket.util.Function3;
@@ -33,7 +33,7 @@
3333
import org.reactivestreams.Publisher;
3434
import reactor.core.publisher.*;
3535

36-
public class KeepAliveConnection extends DuplexConnectionProxy implements ResumeAwareConnection {
36+
public class KeepAliveConnection extends DuplexConnectionProxy implements ResumePositionsConnection {
3737

3838
private final MonoProcessor<KeepAliveHandler> keepAliveHandlerReady = MonoProcessor.create();
3939
private final ByteBufAllocator allocator;

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@
3030
import org.slf4j.LoggerFactory;
3131
import reactor.core.publisher.Mono;
3232

33-
public class ClientRSocketSession implements RSocketSession<Mono<? extends ResumeAwareConnection>> {
33+
public class ClientRSocketSession implements RSocketSession<Mono<? extends ResumePositionsConnection>> {
3434
private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class);
3535

3636
private final ResumableDuplexConnection resumableConnection;
37-
private volatile Mono<? extends ResumeAwareConnection> newConnection;
37+
private volatile Mono<? extends ResumePositionsConnection> newConnection;
3838
private volatile ByteBuf resumeToken;
3939
private final ByteBufAllocator allocator;
4040

4141
public ClientRSocketSession(
4242
ByteBufAllocator allocator,
43-
ResumeAwareConnection duplexConnection,
43+
ResumePositionsConnection duplexConnection,
4444
ClientResumeConfiguration config) {
4545
this.allocator = Objects.requireNonNull(allocator);
4646
this.resumableConnection =
@@ -101,7 +101,7 @@ public ClientRSocketSession(
101101
}
102102

103103
@Override
104-
public ClientRSocketSession continueWith(Mono<? extends ResumeAwareConnection> newConnection) {
104+
public ClientRSocketSession continueWith(Mono<? extends ResumePositionsConnection> newConnection) {
105105
this.newConnection = newConnection;
106106
return this;
107107
}
@@ -137,7 +137,7 @@ public ClientRSocketSession resumeToken(ByteBuf resumeToken) {
137137
}
138138

139139
@Override
140-
public void reconnect(ResumeAwareConnection connection) {
140+
public void reconnect(ResumePositionsConnection connection) {
141141
resumableConnection.reconnect(connection);
142142
}
143143

rsocket-core/src/main/java/io/rsocket/resume/RSocketSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public interface RSocketSession<T> extends Closeable {
3131

3232
RSocketSession resumeWith(ByteBuf resumeFrame);
3333

34-
void reconnect(ResumeAwareConnection connection);
34+
void reconnect(ResumePositionsConnection connection);
3535

3636
@Override
3737
default Mono<Void> onClose() {

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
4040

4141
private final ReplayProcessor<DuplexConnection> connections = ReplayProcessor.create(1);
4242
private final EmitterProcessor<Throwable> connectionErrors = EmitterProcessor.create();
43-
private volatile ResumeAwareConnection curConnection;
43+
private volatile ResumePositionsConnection curConnection;
4444
/*used instead of EmitterProcessor because its autocancel=false capability had no expected effect*/
4545
private final FluxProcessor<ByteBuf, ByteBuf> downStreamFrames = ReplayProcessor.create(0);
4646
private final FluxProcessor<ByteBuf, ByteBuf> resumeSaveFrames = EmitterProcessor.create();
@@ -64,7 +64,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
6464

6565
ResumableDuplexConnection(
6666
String tag,
67-
ResumeAwareConnection duplexConnection,
67+
ResumePositionsConnection duplexConnection,
6868
ResumableFramesStore resumableFramesStore,
6969
Duration resumeStreamTimeout) {
7070
this.tag = tag;
@@ -98,7 +98,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
9898

9999
/*reconnected by session after error. After this downstream can receive frames,
100100
* but sending in suppressed until resume() is called*/
101-
public void reconnect(ResumeAwareConnection connection) {
101+
public void reconnect(ResumePositionsConnection connection) {
102102
if (curConnection == null) {
103103
logger.debug("{} Resumable duplex connection started with connection: {}", tag, connection);
104104
state = State.CONNECTED;
@@ -225,7 +225,7 @@ private void dispatch(Flux<?> f) {
225225
});
226226
}
227227

228-
private void doResumeStart(ResumeAwareConnection connection) {
228+
private void doResumeStart(ResumePositionsConnection connection) {
229229
state = State.RESUME_STARTED;
230230
resumedStreamDisposable.dispose();
231231
upstreamSubscriber.resumeStart();
@@ -306,7 +306,7 @@ private Mono<Void> streamResumedFrames(Flux<ByteBuf> frames) {
306306
});
307307
}
308308

309-
private void onNewConnection(ResumeAwareConnection connection) {
309+
private void onNewConnection(ResumePositionsConnection connection) {
310310
curConnection = connection;
311311
connection.onClose().doFinally(v -> disconnect(connection)).subscribe();
312312
connections.onNext(connection);
@@ -364,9 +364,9 @@ public boolean isActive() {
364364
}
365365

366366
class ResumeStart implements Runnable {
367-
private ResumeAwareConnection connection;
367+
private ResumePositionsConnection connection;
368368

369-
public ResumeStart(ResumeAwareConnection connection) {
369+
public ResumeStart(ResumePositionsConnection connection) {
370370
this.connection = connection;
371371
}
372372

rsocket-core/src/main/java/io/rsocket/resume/ResumeAwareConnection.java renamed to rsocket-core/src/main/java/io/rsocket/resume/ResumePositionsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import io.rsocket.DuplexConnection;
2020

21-
public interface ResumeAwareConnection extends DuplexConnection {
21+
public interface ResumePositionsConnection extends DuplexConnection {
2222

2323
void acceptResumeState(ResumeStateHolder resumeStateHolder);
2424
}

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@
3131
import reactor.core.publisher.Mono;
3232
import reactor.core.publisher.ReplayProcessor;
3333

34-
public class ServerRSocketSession implements RSocketSession<ResumeAwareConnection> {
34+
public class ServerRSocketSession implements RSocketSession<ResumePositionsConnection> {
3535
private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class);
3636

3737
private final ResumableDuplexConnection resumableConnection;
3838
/*used instead of EmitterProcessor because its autocancel=false capability had no expected effect*/
39-
private final FluxProcessor<ResumeAwareConnection, ResumeAwareConnection> newConnections =
39+
private final FluxProcessor<ResumePositionsConnection, ResumePositionsConnection> newConnections =
4040
ReplayProcessor.create(0);
4141
private final ByteBufAllocator allocator;
4242
private final KeepAliveData keepAliveData;
4343
private final ByteBuf resumeToken;
4444

4545
public ServerRSocketSession(
4646
ByteBufAllocator allocator,
47-
ResumeAwareConnection duplexConnection,
47+
ResumePositionsConnection duplexConnection,
4848
ServerResumeConfiguration config,
4949
KeepAliveData keepAliveData,
5050
ByteBuf resumeToken) {
@@ -58,7 +58,7 @@ public ServerRSocketSession(
5858
config.resumeStoreFactory().apply(resumeToken),
5959
config.resumeStreamTimeout());
6060

61-
Mono<ResumeAwareConnection> timeout =
61+
Mono<ResumePositionsConnection> timeout =
6262
resumableConnection
6363
.connectionErrors()
6464
.flatMap(
@@ -70,7 +70,7 @@ public ServerRSocketSession(
7070
.timeout(config.sessionDuration());
7171
})
7272
.then()
73-
.cast(ResumeAwareConnection.class);
73+
.cast(ResumePositionsConnection.class);
7474

7575
newConnections
7676
.mergeWith(timeout)
@@ -86,7 +86,7 @@ public ServerRSocketSession(
8686
}
8787

8888
@Override
89-
public ServerRSocketSession continueWith(ResumeAwareConnection newConnection) {
89+
public ServerRSocketSession continueWith(ResumePositionsConnection newConnection) {
9090
logger.debug("Server continued with connection: {}", newConnection);
9191
newConnections.onNext(newConnection);
9292
return this;
@@ -116,7 +116,7 @@ public ServerRSocketSession resumeWith(ByteBuf resumeFrame) {
116116
}
117117

118118
@Override
119-
public void reconnect(ResumeAwareConnection connection) {
119+
public void reconnect(ResumePositionsConnection connection) {
120120
resumableConnection.reconnect(connection);
121121
}
122122

0 commit comments

Comments
 (0)