Skip to content

Commit efd1269

Browse files
adds first frame handling timeout (#1027)
Co-authored-by: Rossen Stoyanchev <[email protected]>
1 parent 1f71914 commit efd1269

File tree

6 files changed

+89
-9
lines changed

6 files changed

+89
-9
lines changed

.github/workflows/gradle-all.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,10 @@ jobs:
142142
run: chmod +x gradlew
143143
- name: Publish Packages to Artifactory
144144
if: ${{ matrix.jdk == '1.8' }}
145-
run: ./gradlew -PversionSuffix="-${githubRef#refs/heads/}-SNAPSHOT" -PbuildNumber="${buildNumber}" publishMavenPublicationToGitHubPackagesRepository --no-daemon --stacktrace
145+
run: |
146+
githubRef="${githubRef#refs/heads/}"
147+
githubRef="${githubRef////-}"
148+
./gradlew -PversionSuffix="-${githubRef}-SNAPSHOT" -PbuildNumber="${buildNumber}" publishMavenPublicationToGitHubPackagesRepository --no-daemon --stacktrace
146149
env:
147150
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
148151
githubRef: ${{ github.ref }}

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.rsocket.plugins.RequestInterceptor;
4242
import io.rsocket.resume.SessionManager;
4343
import io.rsocket.transport.ServerTransport;
44+
import java.time.Duration;
4445
import java.util.Objects;
4546
import java.util.function.Consumer;
4647
import java.util.function.Supplier;
@@ -70,6 +71,7 @@ public final class RSocketServer {
7071
private int mtu = 0;
7172
private int maxInboundPayloadSize = Integer.MAX_VALUE;
7273
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
74+
private Duration timeout = Duration.ofMinutes(1);
7375

7476
private RSocketServer() {}
7577

@@ -223,6 +225,23 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) {
223225
return this;
224226
}
225227

228+
/**
229+
* Specify the max time to wait for the first frame (e.g. {@code SETUP}) on an accepted
230+
* connection.
231+
*
232+
* <p>By default this is set to 1 minute.
233+
*
234+
* @param timeout duration
235+
* @return the same instance for method chaining
236+
*/
237+
public RSocketServer maxTimeToFirstFrame(Duration timeout) {
238+
if (timeout.isNegative() || timeout.isZero()) {
239+
throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero");
240+
}
241+
this.timeout = timeout;
242+
return this;
243+
}
244+
226245
/**
227246
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
228247
* fragmented.
@@ -287,7 +306,7 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
287306
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
288307
return Mono.defer(
289308
new Supplier<Mono<T>>() {
290-
final ServerSetup serverSetup = serverSetup();
309+
final ServerSetup serverSetup = serverSetup(timeout);
291310

292311
@Override
293312
public Mono<T> get() {
@@ -326,7 +345,7 @@ public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
326345
public ServerTransport.ConnectionAcceptor asConnectionAcceptor(int maxFrameLength) {
327346
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
328347
return new ServerTransport.ConnectionAcceptor() {
329-
private final ServerSetup serverSetup = serverSetup();
348+
private final ServerSetup serverSetup = serverSetup(timeout);
330349

331350
@Override
332351
public Mono<Void> apply(DuplexConnection connection) {
@@ -469,12 +488,13 @@ private Mono<Void> acceptSetup(
469488
});
470489
}
471490

472-
private ServerSetup serverSetup() {
473-
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
491+
private ServerSetup serverSetup(Duration timeout) {
492+
return resume != null ? createSetup(timeout) : new ServerSetup.DefaultServerSetup(timeout);
474493
}
475494

476-
ServerSetup createSetup() {
495+
ServerSetup createSetup(Duration timeout) {
477496
return new ServerSetup.ResumableServerSetup(
497+
timeout,
478498
new SessionManager(),
479499
resume.getSessionDuration(),
480500
resume.getStreamTimeout(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,16 @@
3636

3737
abstract class ServerSetup {
3838

39+
final Duration timeout;
40+
41+
protected ServerSetup(Duration timeout) {
42+
this.timeout = timeout;
43+
}
44+
3945
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
4046
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
4147
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
48+
.timeout(this.timeout)
4249
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
4350
}
4451

@@ -57,6 +64,10 @@ void sendError(DuplexConnection duplexConnection, RSocketErrorException exceptio
5764

5865
static class DefaultServerSetup extends ServerSetup {
5966

67+
DefaultServerSetup(Duration timeout) {
68+
super(timeout);
69+
}
70+
6071
@Override
6172
public Mono<Void> acceptRSocketSetup(
6273
ByteBuf frame,
@@ -86,11 +97,13 @@ static class ResumableServerSetup extends ServerSetup {
8697
private final boolean cleanupStoreOnKeepAlive;
8798

8899
ResumableServerSetup(
100+
Duration timeout,
89101
SessionManager sessionManager,
90102
Duration resumeSessionDuration,
91103
Duration resumeStreamTimeout,
92104
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
93105
boolean cleanupStoreOnKeepAlive) {
106+
super(timeout);
94107
this.sessionManager = sessionManager;
95108
this.resumeSessionDuration = resumeSessionDuration;
96109
this.resumeStreamTimeout = resumeStreamTimeout;

rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void request(long n) {
9696

9797
@Override
9898
public void cancel() {
99+
source.dispose();
99100
s.cancel();
100101
}
101102

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import io.rsocket.test.util.TestServerTransport;
3030
import java.time.Duration;
3131
import java.util.Random;
32+
import org.assertj.core.api.Assertions;
3233
import org.junit.jupiter.api.Test;
3334
import reactor.core.Scannable;
3435
import reactor.core.publisher.Mono;
3536
import reactor.core.publisher.Sinks;
3637
import reactor.test.StepVerifier;
38+
import reactor.test.scheduler.VirtualTimeScheduler;
3739

3840
public class RSocketServerTest {
3941

@@ -60,6 +62,32 @@ public void unexpectedFramesBeforeSetupFrame() {
6062
.hasNoLeaks();
6163
}
6264

65+
@Test
66+
public void timeoutOnNoFirstFrame() {
67+
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
68+
try {
69+
TestServerTransport transport = new TestServerTransport();
70+
RSocketServer.create().maxTimeToFirstFrame(Duration.ofMinutes(2)).bind(transport).block();
71+
72+
final TestDuplexConnection duplexConnection = transport.connect();
73+
74+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
75+
76+
Assertions.assertThat(duplexConnection.isDisposed()).isFalse();
77+
78+
scheduler.advanceTimeBy(Duration.ofMinutes(1));
79+
80+
StepVerifier.create(duplexConnection.onClose())
81+
.expectSubscription()
82+
.expectComplete()
83+
.verify(Duration.ofSeconds(10));
84+
85+
FrameAssert.assertThat(duplexConnection.pollFrame()).isNull();
86+
} finally {
87+
VirtualTimeScheduler.reset();
88+
}
89+
}
90+
6391
@Test
6492
public void ensuresMaxFrameLengthCanNotBeLessThenMtu() {
6593
RSocketServer.create()

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.io.InputStreamReader;
4040
import java.net.SocketAddress;
4141
import java.time.Duration;
42+
import java.util.Arrays;
4243
import java.util.concurrent.CancellationException;
4344
import java.util.concurrent.Executors;
4445
import java.util.concurrent.ThreadLocalRandom;
@@ -96,7 +97,18 @@ default void close() {
9697
getTransportPair().responder.awaitAllInteractionTermination(getTimeout());
9798
getTransportPair().dispose();
9899
getTransportPair().awaitClosed();
99-
RuntimeException throwable = new RuntimeException();
100+
RuntimeException throwable =
101+
new RuntimeException() {
102+
@Override
103+
public synchronized Throwable fillInStackTrace() {
104+
return this;
105+
}
106+
107+
@Override
108+
public String getMessage() {
109+
return Arrays.toString(getSuppressed());
110+
}
111+
};
100112

101113
try {
102114
getTransportPair().byteBufAllocator2.assertHasNoLeaks();
@@ -776,8 +788,11 @@ public void onSubscribe(Subscription s) {
776788

777789
@Override
778790
public void onNext(ByteBuf buf) {
779-
actual.onNext(buf);
780-
buf.release();
791+
try {
792+
actual.onNext(buf);
793+
} finally {
794+
buf.release();
795+
}
781796
}
782797

783798
Mono<Void> onClose() {

0 commit comments

Comments
 (0)