Skip to content

Commit ba7a04f

Browse files
committed
bugfixes resumability and ads related integration tests
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 56de5ef commit ba7a04f

29 files changed

+780
-595
lines changed

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ public ClientServerInputMultiplexer(
7373
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
7474
}
7575

76-
DuplexConnection asResponderConnection() {
76+
DuplexConnection asServerConnection() {
7777
return serverConnection;
7878
}
7979

80-
DuplexConnection asRequesterConnection() {
80+
DuplexConnection asClientConnection() {
8181
return clientConnection;
8282
}
8383

rsocket-core/src/main/java/io/rsocket/core/ClientSetup.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.Unpooled;
55
import io.rsocket.DuplexConnection;
6+
import java.nio.channels.ClosedChannelException;
67
import reactor.core.publisher.Mono;
78
import reactor.util.function.Tuple2;
89
import reactor.util.function.Tuples;
@@ -24,7 +25,8 @@ class ResumableClientSetup extends ClientSetup {
2425

2526
@Override
2627
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
27-
return Mono.create(
28-
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)));
28+
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
29+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
30+
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
2931
}
3032
}

rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.rsocket.DuplexConnection;
66
import io.rsocket.RSocketErrorException;
77
import io.rsocket.frame.FrameUtil;
8+
import java.net.SocketAddress;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011
import reactor.core.publisher.Flux;
@@ -56,6 +57,11 @@ public ByteBufAllocator alloc() {
5657
return source.alloc();
5758
}
5859

60+
@Override
61+
public SocketAddress remoteAddress() {
62+
return source.remoteAddress();
63+
}
64+
5965
static DuplexConnection wrapIfEnabled(DuplexConnection source) {
6066
if (LOGGER.isDebugEnabled()) {
6167
return new LoggingDuplexConnection(source);

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
621621

622622
RSocket rSocketRequester =
623623
new RSocketRequester(
624-
multiplexer.asRequesterConnection(),
624+
multiplexer.asClientConnection(),
625625
payloadDecoder,
626626
StreamIdSupplier.clientSupplier(),
627627
mtu,
@@ -662,7 +662,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
662662

663663
RSocket rSocketResponder =
664664
new RSocketResponder(
665-
multiplexer.asResponderConnection(),
665+
multiplexer.asServerConnection(),
666666
wrappedRSocketHandler,
667667
payloadDecoder,
668668
responderLeaseHandler,

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,7 @@ private Mono<Void> accept(
369369
default:
370370
serverSetup.sendError(
371371
clientServerConnection,
372-
new InvalidSetupException(
373-
"invalid setup frame: " + FrameHeaderCodec.frameType(startFrame)));
372+
new InvalidSetupException("SETUP or RESUME frame must be received before any others"));
374373
return clientServerConnection.onClose();
375374
}
376375
}
@@ -412,7 +411,7 @@ private Mono<Void> acceptSetup(
412411

413412
RSocket rSocketRequester =
414413
new RSocketRequester(
415-
multiplexer.asResponderConnection(),
414+
multiplexer.asServerConnection(),
416415
payloadDecoder,
417416
StreamIdSupplier.serverSupplier(),
418417
mtu,
@@ -433,7 +432,7 @@ private Mono<Void> acceptSetup(
433432
.doOnNext(
434433
rSocketHandler -> {
435434
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
436-
DuplexConnection clientConnection = multiplexer.asRequesterConnection();
435+
DuplexConnection clientConnection = multiplexer.asClientConnection();
437436

438437
ResponderLeaseHandler responderLeaseHandler =
439438
leaseEnabled

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.rsocket.frame.SetupFrameCodec;
2828
import io.rsocket.keepalive.KeepAliveHandler;
2929
import io.rsocket.resume.*;
30+
import java.nio.channels.ClosedChannelException;
3031
import java.time.Duration;
3132
import java.util.function.BiFunction;
3233
import java.util.function.Function;
@@ -36,8 +37,9 @@
3637
abstract class ServerSetup {
3738

3839
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
39-
return Mono.create(
40-
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)));
40+
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
41+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
42+
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
4143
}
4244

4345
abstract Mono<Void> acceptRSocketSetup(
@@ -110,14 +112,14 @@ public Mono<Void> acceptRSocketSetup(
110112
new ResumableDuplexConnection(duplexConnection, resumableFramesStore);
111113
final ServerRSocketSession serverRSocketSession =
112114
new ServerRSocketSession(
115+
resumeToken,
113116
duplexConnection,
114117
resumableDuplexConnection,
115118
resumeSessionDuration,
116119
resumableFramesStore,
117-
resumeToken,
118120
cleanupStoreOnKeepAlive);
119121

120-
sessionManager.save(serverRSocketSession);
122+
sessionManager.save(serverRSocketSession, resumeToken);
121123

122124
return then.apply(
123125
new ResumableKeepAliveHandler(

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.netty.buffer.ByteBufAllocator;
55
import io.rsocket.DuplexConnection;
66
import io.rsocket.RSocketErrorException;
7+
import java.net.SocketAddress;
78
import java.nio.channels.ClosedChannelException;
89
import org.reactivestreams.Subscription;
910
import reactor.core.CoreSubscriber;
@@ -42,6 +43,11 @@ public void dispose() {
4243
source.dispose();
4344
}
4445

46+
@Override
47+
public boolean isDisposed() {
48+
return source.isDisposed();
49+
}
50+
4551
@Override
4652
public Mono<Void> onClose() {
4753
return source.onClose();
@@ -57,6 +63,11 @@ public Flux<ByteBuf> receive() {
5763
return this;
5864
}
5965

66+
@Override
67+
public SocketAddress remoteAddress() {
68+
return source.remoteAddress();
69+
}
70+
6071
@Override
6172
public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
6273
if (done) {

0 commit comments

Comments
 (0)