Skip to content

Commit 3fe88bf

Browse files
committed
bugfixes resumability and ads related integration tests
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 17b1e5d commit 3fe88bf

26 files changed

+735
-586
lines changed

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ public ClientServerInputMultiplexer(
7272
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
7373
}
7474

75-
DuplexConnection asResponderConnection() {
75+
DuplexConnection asServerConnection() {
7676
return serverConnection;
7777
}
7878

79-
DuplexConnection asRequesterConnection() {
79+
DuplexConnection asClientConnection() {
8080
return clientConnection;
8181
}
8282

rsocket-core/src/main/java/io/rsocket/core/ClientSetup.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.Unpooled;
55
import io.rsocket.DuplexConnection;
6+
import java.nio.channels.ClosedChannelException;
67
import reactor.core.publisher.Mono;
78
import reactor.util.function.Tuple2;
89
import reactor.util.function.Tuples;
@@ -24,7 +25,8 @@ class ResumableClientSetup extends ClientSetup {
2425

2526
@Override
2627
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
27-
return Mono.create(
28-
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)));
28+
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
29+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
30+
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
2931
}
3032
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
621621

622622
RSocket rSocketRequester =
623623
new RSocketRequester(
624-
multiplexer.asRequesterConnection(),
624+
multiplexer.asClientConnection(),
625625
payloadDecoder,
626626
StreamIdSupplier.clientSupplier(),
627627
mtu,
@@ -662,7 +662,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
662662

663663
RSocket rSocketResponder =
664664
new RSocketResponder(
665-
multiplexer.asResponderConnection(),
665+
multiplexer.asServerConnection(),
666666
wrappedRSocketHandler,
667667
payloadDecoder,
668668
responderLeaseHandler,

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,7 @@ private Mono<Void> accept(
369369
default:
370370
serverSetup.sendError(
371371
clientServerConnection,
372-
new InvalidSetupException(
373-
"invalid setup frame: " + FrameHeaderCodec.frameType(startFrame)));
372+
new InvalidSetupException("SETUP or RESUME frame must be received before any others"));
374373
return clientServerConnection.onClose();
375374
}
376375
}
@@ -412,7 +411,7 @@ private Mono<Void> acceptSetup(
412411

413412
RSocket rSocketRequester =
414413
new RSocketRequester(
415-
multiplexer.asResponderConnection(),
414+
multiplexer.asServerConnection(),
416415
payloadDecoder,
417416
StreamIdSupplier.serverSupplier(),
418417
mtu,
@@ -433,7 +432,7 @@ private Mono<Void> acceptSetup(
433432
.doOnNext(
434433
rSocketHandler -> {
435434
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
436-
DuplexConnection clientConnection = multiplexer.asRequesterConnection();
435+
DuplexConnection clientConnection = multiplexer.asClientConnection();
437436

438437
ResponderLeaseHandler responderLeaseHandler =
439438
leaseEnabled

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.rsocket.frame.SetupFrameCodec;
2828
import io.rsocket.keepalive.KeepAliveHandler;
2929
import io.rsocket.resume.*;
30+
import java.nio.channels.ClosedChannelException;
3031
import java.time.Duration;
3132
import java.util.function.BiFunction;
3233
import java.util.function.Function;
@@ -36,8 +37,9 @@
3637
abstract class ServerSetup {
3738

3839
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
39-
return Mono.create(
40-
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)));
40+
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
41+
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
42+
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
4143
}
4244

4345
abstract Mono<Void> acceptRSocketSetup(
@@ -110,14 +112,14 @@ public Mono<Void> acceptRSocketSetup(
110112
new ResumableDuplexConnection(duplexConnection, resumableFramesStore);
111113
final ServerRSocketSession serverRSocketSession =
112114
new ServerRSocketSession(
115+
resumeToken,
113116
duplexConnection,
114117
resumableDuplexConnection,
115118
resumeSessionDuration,
116119
resumableFramesStore,
117-
resumeToken,
118120
cleanupStoreOnKeepAlive);
119121

120-
sessionManager.save(serverRSocketSession);
122+
sessionManager.save(serverRSocketSession, resumeToken);
121123

122124
return then.apply(
123125
new ResumableKeepAliveHandler(

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public void dispose() {
4242
source.dispose();
4343
}
4444

45+
@Override
46+
public boolean isDisposed() {
47+
return source.isDisposed();
48+
}
49+
4550
@Override
4651
public Mono<Void> onClose() {
4752
return source.onClose();

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 0 additions & 244 deletions
This file was deleted.

0 commit comments

Comments
 (0)