Skip to content

Commit 20ad9dc

Browse files
committed
improves transport resumability tests
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 31364a4 commit 20ad9dc

File tree

4 files changed

+67
-8
lines changed

4 files changed

+67
-8
lines changed

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ public TransportPair(
547547
"Server",
548548
duplexConnection,
549549
Duration.ofMillis(
550-
ThreadLocalRandom.current().nextInt(10, 1500)))
550+
ThreadLocalRandom.current().nextInt(100, 500)))
551551
: duplexConnection);
552552
}
553553
});
@@ -568,7 +568,7 @@ public TransportPair(
568568
final RSocketConnector rSocketConnector =
569569
RSocketConnector.create()
570570
.payloadDecoder(PayloadDecoder.ZERO_COPY)
571-
.keepAlive(Duration.ofMillis(Integer.MAX_VALUE), Duration.ofMillis(Integer.MAX_VALUE))
571+
.keepAlive(Duration.ofMillis(10), Duration.ofHours(1))
572572
.interceptors(
573573
registry -> {
574574
if (runClientWithAsyncInterceptors && !withResumability) {
@@ -594,7 +594,7 @@ public TransportPair(
594594
"Client",
595595
duplexConnection,
596596
Duration.ofMillis(
597-
ThreadLocalRandom.current().nextInt(1, 2000)))
597+
ThreadLocalRandom.current().nextInt(100, 500)))
598598
: duplexConnection);
599599
}
600600
});

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,11 @@ public void onSubscribe(Subscription s) {
136136

137137
@Override
138138
public void onNext(ByteBuf buf) {
139-
actual.onNext(buf);
140-
buf.release();
139+
try {
140+
actual.onNext(buf);
141+
} finally {
142+
buf.release();
143+
}
141144
}
142145

143146
@Override

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import io.rsocket.test.TransportTest;
2020
import java.time.Duration;
2121
import java.util.UUID;
22-
import org.junit.jupiter.api.Disabled;
2322

24-
@Disabled("leaking somewhere for no clear reason")
2523
final class LocalResumableTransportTest implements TransportTest {
2624

2725
private final TransportPair transportPair =
@@ -34,7 +32,7 @@ final class LocalResumableTransportTest implements TransportTest {
3432

3533
@Override
3634
public Duration getTimeout() {
37-
return Duration.ofSeconds(10);
35+
return Duration.ofMinutes(3);
3836
}
3937

4038
@Override
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import io.netty.channel.ChannelOption;
20+
import io.rsocket.test.TransportTest;
21+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
22+
import io.rsocket.transport.netty.server.WebsocketServerTransport;
23+
import java.net.InetSocketAddress;
24+
import java.time.Duration;
25+
import reactor.netty.http.client.HttpClient;
26+
import reactor.netty.http.server.HttpServer;
27+
28+
final class WebsocketResumableTransportTest implements TransportTest {
29+
30+
private final TransportPair transportPair =
31+
new TransportPair<>(
32+
() -> InetSocketAddress.createUnresolved("localhost", 0),
33+
(address, server, allocator) ->
34+
WebsocketClientTransport.create(
35+
HttpClient.create()
36+
.host(server.address().getHostName())
37+
.port(server.address().getPort())
38+
.option(ChannelOption.ALLOCATOR, allocator),
39+
""),
40+
(address, allocator) ->
41+
WebsocketServerTransport.create(
42+
HttpServer.create()
43+
.host(address.getHostName())
44+
.port(address.getPort())
45+
.option(ChannelOption.ALLOCATOR, allocator)),
46+
false,
47+
true);
48+
49+
@Override
50+
public Duration getTimeout() {
51+
return Duration.ofMinutes(3);
52+
}
53+
54+
@Override
55+
public TransportPair getTransportPair() {
56+
return transportPair;
57+
}
58+
}

0 commit comments

Comments
 (0)