Skip to content

Commit 93c30b6

Browse files
Oleh DokukaOlegDokuka
Oleh Dokuka
authored andcommitted
adds first frame handling timeout
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1f71914 commit 93c30b6

File tree

4 files changed

+66
-5
lines changed

4 files changed

+66
-5
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.rsocket.plugins.RequestInterceptor;
4242
import io.rsocket.resume.SessionManager;
4343
import io.rsocket.transport.ServerTransport;
44+
import java.time.Duration;
4445
import java.util.Objects;
4546
import java.util.function.Consumer;
4647
import java.util.function.Supplier;
@@ -70,6 +71,7 @@ public final class RSocketServer {
7071
private int mtu = 0;
7172
private int maxInboundPayloadSize = Integer.MAX_VALUE;
7273
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
74+
private Duration timeout = Duration.ofMinutes(1);
7375

7476
private RSocketServer() {}
7577

@@ -223,6 +225,22 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) {
223225
return this;
224226
}
225227

228+
/**
229+
* Specifies timeout for the first incoming frame from the accepted connection.
230+
*
231+
* <p>By default this is set to 1 minute.
232+
*
233+
* @param timeout duration
234+
* @return the same instance for method chaining
235+
*/
236+
public RSocketServer setupHandlingTimeout(Duration timeout) {
237+
if (timeout.isNegative() || timeout.isZero()) {
238+
throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero");
239+
}
240+
this.timeout = timeout;
241+
return this;
242+
}
243+
226244
/**
227245
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
228246
* fragmented.
@@ -287,7 +305,7 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
287305
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
288306
return Mono.defer(
289307
new Supplier<Mono<T>>() {
290-
final ServerSetup serverSetup = serverSetup();
308+
final ServerSetup serverSetup = serverSetup(timeout);
291309

292310
@Override
293311
public Mono<T> get() {
@@ -326,7 +344,7 @@ public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
326344
public ServerTransport.ConnectionAcceptor asConnectionAcceptor(int maxFrameLength) {
327345
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
328346
return new ServerTransport.ConnectionAcceptor() {
329-
private final ServerSetup serverSetup = serverSetup();
347+
private final ServerSetup serverSetup = serverSetup(timeout);
330348

331349
@Override
332350
public Mono<Void> apply(DuplexConnection connection) {
@@ -469,12 +487,13 @@ private Mono<Void> acceptSetup(
469487
});
470488
}
471489

472-
private ServerSetup serverSetup() {
473-
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
490+
private ServerSetup serverSetup(Duration timeout) {
491+
return resume != null ? createSetup(timeout) : new ServerSetup.DefaultServerSetup(timeout);
474492
}
475493

476-
ServerSetup createSetup() {
494+
ServerSetup createSetup(Duration timeout) {
477495
return new ServerSetup.ResumableServerSetup(
496+
timeout,
478497
new SessionManager(),
479498
resume.getSessionDuration(),
480499
resume.getStreamTimeout(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,16 @@
3636

3737
abstract class ServerSetup {
3838

39+
final Duration timeout;
40+
41+
protected ServerSetup(Duration timeout) {
42+
this.timeout = timeout;
43+
}
44+
3945
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
4046
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
4147
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
48+
.timeout(this.timeout)
4249
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
4350
}
4451

@@ -57,6 +64,10 @@ void sendError(DuplexConnection duplexConnection, RSocketErrorException exceptio
5764

5865
static class DefaultServerSetup extends ServerSetup {
5966

67+
DefaultServerSetup(Duration timeout) {
68+
super(timeout);
69+
}
70+
6071
@Override
6172
public Mono<Void> acceptRSocketSetup(
6273
ByteBuf frame,
@@ -86,11 +97,13 @@ static class ResumableServerSetup extends ServerSetup {
8697
private final boolean cleanupStoreOnKeepAlive;
8798

8899
ResumableServerSetup(
100+
Duration timeout,
89101
SessionManager sessionManager,
90102
Duration resumeSessionDuration,
91103
Duration resumeStreamTimeout,
92104
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
93105
boolean cleanupStoreOnKeepAlive) {
106+
super(timeout);
94107
this.sessionManager = sessionManager;
95108
this.resumeSessionDuration = resumeSessionDuration;
96109
this.resumeStreamTimeout = resumeStreamTimeout;

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void request(long n) {
9696

9797
@Override
9898
public void cancel() {
99+
source.dispose();
99100
s.cancel();
100101
}
101102

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import io.rsocket.test.util.TestServerTransport;
3030
import java.time.Duration;
3131
import java.util.Random;
32+
import org.assertj.core.api.Assertions;
3233
import org.junit.jupiter.api.Test;
3334
import reactor.core.Scannable;
3435
import reactor.core.publisher.Mono;
3536
import reactor.core.publisher.Sinks;
3637
import reactor.test.StepVerifier;
38+
import reactor.test.scheduler.VirtualTimeScheduler;
3739

3840
public class RSocketServerTest {
3941

@@ -60,6 +62,32 @@ public void unexpectedFramesBeforeSetupFrame() {
6062
.hasNoLeaks();
6163
}
6264

65+
@Test
66+
public void timeoutOnNoFirstFrame() {
67+
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
68+
try {
69+
TestServerTransport transport = new TestServerTransport();
70+
RSocketServer.create().setupHandlingTimeout(Duration.ofMinutes(2)).bind(transport).block();
71+
72+
final TestDuplexConnection duplexConnection = transport.connect();
73+
74+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
75+
76+
Assertions.assertThat(duplexConnection.isDisposed()).isFalse();
77+
78+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
79+
80+
StepVerifier.create(duplexConnection.onClose())
81+
.expectSubscription()
82+
.expectComplete()
83+
.verify(Duration.ofSeconds(10));
84+
85+
FrameAssert.assertThat(duplexConnection.pollFrame()).isNull();
86+
} finally {
87+
VirtualTimeScheduler.reset();
88+
}
89+
}
90+
6391
@Test
6492
public void ensuresMaxFrameLengthCanNotBeLessThenMtu() {
6593
RSocketServer.create()

0 commit comments

Comments
 (0)