Skip to content

Commit df44351

Browse files
Oleh DokukaOlegDokuka
authored andcommitted
adds first frame handling timeout
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1f71914 commit df44351

File tree

4 files changed

+70
-5
lines changed

4 files changed

+70
-5
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import io.rsocket.plugins.RequestInterceptor;
4242
import io.rsocket.resume.SessionManager;
4343
import io.rsocket.transport.ServerTransport;
44+
45+
import java.time.Duration;
4446
import java.util.Objects;
4547
import java.util.function.Consumer;
4648
import java.util.function.Supplier;
@@ -70,6 +72,7 @@ public final class RSocketServer {
7072
private int mtu = 0;
7173
private int maxInboundPayloadSize = Integer.MAX_VALUE;
7274
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
75+
private Duration timeout = Duration.ofMinutes(1);
7376

7477
private RSocketServer() {}
7578

@@ -223,6 +226,23 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) {
223226
return this;
224227
}
225228

229+
230+
/**
231+
* Specifies timeout for the first incoming frame from the accepted connection.
232+
*
233+
* <p>By default this is set to 1 minute.
234+
*
235+
* @param timeout duration
236+
* @return the same instance for method chaining
237+
*/
238+
public RSocketServer setupHandlingTimeout(Duration timeout) {
239+
if (timeout.isNegative() || timeout.isZero()) {
240+
throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero");
241+
}
242+
this.timeout = timeout;
243+
return this;
244+
}
245+
226246
/**
227247
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
228248
* fragmented.
@@ -287,7 +307,7 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
287307
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
288308
return Mono.defer(
289309
new Supplier<Mono<T>>() {
290-
final ServerSetup serverSetup = serverSetup();
310+
final ServerSetup serverSetup = serverSetup(timeout);
291311

292312
@Override
293313
public Mono<T> get() {
@@ -326,7 +346,7 @@ public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
326346
public ServerTransport.ConnectionAcceptor asConnectionAcceptor(int maxFrameLength) {
327347
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
328348
return new ServerTransport.ConnectionAcceptor() {
329-
private final ServerSetup serverSetup = serverSetup();
349+
private final ServerSetup serverSetup = serverSetup(timeout);
330350

331351
@Override
332352
public Mono<Void> apply(DuplexConnection connection) {
@@ -469,12 +489,13 @@ private Mono<Void> acceptSetup(
469489
});
470490
}
471491

472-
private ServerSetup serverSetup() {
473-
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
492+
private ServerSetup serverSetup(Duration timeout) {
493+
return resume != null ? createSetup(timeout) : new ServerSetup.DefaultServerSetup(timeout);
474494
}
475495

476-
ServerSetup createSetup() {
496+
ServerSetup createSetup(Duration timeout) {
477497
return new ServerSetup.ResumableServerSetup(
498+
timeout,
478499
new SessionManager(),
479500
resume.getSessionDuration(),
480501
resume.getStreamTimeout(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,16 @@
3636

3737
abstract class ServerSetup {
3838

39+
final Duration timeout;
40+
41+
protected ServerSetup(Duration timeout) {
42+
this.timeout = timeout;
43+
}
44+
3945
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
4046
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
4147
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
48+
.timeout(this.timeout)
4249
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
4350
}
4451

@@ -57,6 +64,10 @@ void sendError(DuplexConnection duplexConnection, RSocketErrorException exceptio
5764

5865
static class DefaultServerSetup extends ServerSetup {
5966

67+
DefaultServerSetup(Duration timeout) {
68+
super(timeout);
69+
}
70+
6071
@Override
6172
public Mono<Void> acceptRSocketSetup(
6273
ByteBuf frame,
@@ -86,11 +97,13 @@ static class ResumableServerSetup extends ServerSetup {
8697
private final boolean cleanupStoreOnKeepAlive;
8798

8899
ResumableServerSetup(
100+
Duration timeout,
89101
SessionManager sessionManager,
90102
Duration resumeSessionDuration,
91103
Duration resumeStreamTimeout,
92104
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
93105
boolean cleanupStoreOnKeepAlive) {
106+
super(timeout);
94107
this.sessionManager = sessionManager;
95108
this.resumeSessionDuration = resumeSessionDuration;
96109
this.resumeStreamTimeout = resumeStreamTimeout;

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void request(long n) {
9696

9797
@Override
9898
public void cancel() {
99+
source.dispose();
99100
s.cancel();
100101
}
101102

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import io.rsocket.test.util.TestServerTransport;
3030
import java.time.Duration;
3131
import java.util.Random;
32+
33+
import org.assertj.core.api.Assertions;
3234
import org.junit.jupiter.api.Test;
3335
import reactor.core.Scannable;
3436
import reactor.core.publisher.Mono;
3537
import reactor.core.publisher.Sinks;
3638
import reactor.test.StepVerifier;
39+
import reactor.test.scheduler.VirtualTimeScheduler;
3740

3841
public class RSocketServerTest {
3942

@@ -60,6 +63,33 @@ public void unexpectedFramesBeforeSetupFrame() {
6063
.hasNoLeaks();
6164
}
6265

66+
@Test
67+
public void timeoutOnNoFirstFrame() {
68+
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
69+
try {
70+
TestServerTransport transport = new TestServerTransport();
71+
RSocketServer.create().setupHandlingTimeout(Duration.ofMinutes(2)).bind(transport).block();
72+
73+
final TestDuplexConnection duplexConnection = transport.connect();
74+
75+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
76+
77+
Assertions.assertThat(duplexConnection.isDisposed()).isFalse();
78+
79+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
80+
81+
StepVerifier.create(duplexConnection.onClose())
82+
.expectSubscription()
83+
.expectComplete()
84+
.verify(Duration.ofSeconds(10));
85+
86+
FrameAssert.assertThat(duplexConnection.pollFrame())
87+
.isNull();
88+
} finally {
89+
VirtualTimeScheduler.reset();
90+
}
91+
}
92+
6393
@Test
6494
public void ensuresMaxFrameLengthCanNotBeLessThenMtu() {
6595
RSocketServer.create()

0 commit comments

Comments
 (0)