Skip to content

Commit 9f6fbcc

Browse files
nebhalerobertroeser
authored andcommitted
Netty Transport Improvements (#507)
This change makes general improvements to the Netty Transport. It updates the code to make it better documented and tested. In addition it introduces the new TransportTest, a JUnit 5 way of testing uniformly across all of the transport implementations.
1 parent d01121f commit 9f6fbcc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1514
-334
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
plugins {
1818
id 'com.gradle.build-scan' version '1.12.1'
1919

20-
id 'com.github.johnrengelman.shadow' version '2.0.2' apply false
2120
id 'com.github.sherter.google-java-format' version '0.6'
2221
id 'com.jfrog.artifactory' version '4.7.0'
2322
id 'com.jfrog.bintray' version '1.8.0'

rsocket-core/build.gradle

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
plugins {
1818
id 'java-library'
1919
id 'maven-publish'
20-
id 'com.github.johnrengelman.shadow'
2120
id 'com.jfrog.artifactory'
2221
id 'com.jfrog.bintray'
2322
id 'io.morethan.jmhreport'
@@ -29,10 +28,10 @@ dependencies {
2928
api 'io.projectreactor:reactor-core'
3029

3130
implementation 'io.projectreactor.addons:reactor-extra'
31+
implementation 'org.jctools:jctools-core'
3232
implementation 'org.slf4j:slf4j-api'
3333

3434
compileOnly 'com.google.code.findbugs:jsr305'
35-
compileOnly 'org.jctools:jctools-core'
3635

3736
testImplementation 'io.projectreactor:reactor-test'
3837
testImplementation 'org.assertj:assertj-core'
@@ -49,28 +48,6 @@ dependencies {
4948
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
5049
}
5150

52-
jar {
53-
enabled = false
54-
dependsOn(shadowJar { classifier = null })
55-
}
56-
57-
shadowJar {
58-
configurations = [project.configurations.compileOnly]
59-
60-
dependencies {
61-
include(dependency('org.jctools:jctools-core'))
62-
}
63-
64-
include '*.jar'
65-
include 'io/rsocket/**'
66-
include 'org/jctools/maps/ConcurrentAutoTable*.class'
67-
include 'org/jctools/maps/NonBlockingHashMapLong*.class'
68-
include 'org/jctools/util/RangeUtil*.class'
69-
include 'org/jctools/util/UnsafeAccess*.class'
70-
71-
relocate 'org.jctools', 'io.rsocket.shadowed.org.jctools'
72-
}
73-
7451
description = "Core functionality for the RSocket library"
7552

7653
apply from: 'jmh.gradle'

rsocket-core/src/jmh/java/io/rsocket/fragmentation/FragmentationPerformanceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void largeFragmentation(Input input) {
5757

5858
@Benchmark
5959
public void largeReassembly(Input input) {
60-
input.largeFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
60+
input.largeFrames.forEach(frame -> input.reassembler.reassemble(frame));
6161

6262
input.bh.consume(input.sink.next);
6363
}
@@ -72,7 +72,7 @@ public void smallFragmentation(Input input) {
7272

7373
@Benchmark
7474
public void smallReassembly(Input input) {
75-
input.smallFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
75+
input.smallFrames.forEach(frame -> input.reassembler.reassemble(frame));
7676

7777
input.bh.consume(input.sink.next);
7878
}

rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public interface ServerTransport<T extends Closeable> extends Transport {
3030
*
3131
* @param acceptor An acceptor to process a newly accepted {@code DuplexConnection}
3232
* @return A handle to retrieve information about a started server.
33+
* @throws NullPointerException if {@code acceptor} is {@code null}
3334
*/
3435
Mono<T> start(ConnectionAcceptor acceptor);
3536

rsocket-core/src/main/java/io/rsocket/transport/TransportHeaderAware.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,12 @@
2424
* Http2.
2525
*/
2626
public interface TransportHeaderAware {
27+
28+
/**
29+
* Sets the transport headers
30+
*
31+
* @param transportHeaders the transport headers
32+
* @throws NullPointerException if {@code transportHeaders} is {@code null}
33+
*/
2734
void setTransportHeaders(Supplier<Map<String, String>> transportHeaders);
2835
}

rsocket-test/build.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616

1717
plugins {
1818
id 'java-library'
19+
id 'maven-publish'
20+
id 'com.jfrog.artifactory'
21+
id 'com.jfrog.bintray'
1922
}
2023

2124
dependencies {
25+
api project(':rsocket-core')
2226
api 'org.hdrhistogram:HdrHistogram'
27+
api 'org.junit.jupiter:junit-jupiter-api'
28+
29+
compileOnly 'com.google.code.findbugs:jsr305'
2330

24-
implementation project(':rsocket-core')
25-
implementation 'com.google.code.findbugs:jsr305'
2631
implementation 'io.projectreactor:reactor-test'
2732
implementation 'org.assertj:assertj-core'
28-
implementation 'org.junit.jupiter:junit-jupiter-api'
2933
implementation 'org.mockito:mockito-core'
3034

3135
// TODO: Remove after JUnit5 migration

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.rsocket.transport.ServerTransport;
2525
import io.rsocket.util.DefaultPayload;
2626
import java.time.Duration;
27+
import java.util.function.BiFunction;
28+
import java.util.function.Function;
29+
import java.util.function.Supplier;
2730
import org.junit.jupiter.api.AfterEach;
2831
import org.junit.jupiter.api.DisplayName;
2932
import org.junit.jupiter.api.Test;
@@ -258,21 +261,32 @@ default void requestStreamDelayedRequestN() {
258261
.verify(getTimeout());
259262
}
260263

261-
final class TransportPair implements Disposable {
264+
final class TransportPair<T, S extends Closeable> implements Disposable {
262265

263266
private final RSocket client;
264267

265-
private final Closeable server;
268+
private final S server;
266269

267-
public TransportPair(ServerTransport<?> serverTransport, ClientTransport clientTransport) {
268-
this.server =
270+
public TransportPair(
271+
Supplier<T> addressSupplier,
272+
BiFunction<T, S, ClientTransport> clientTransportSupplier,
273+
Function<T, ServerTransport<S>> serverTransportSupplier) {
274+
275+
T address = addressSupplier.get();
276+
277+
server =
269278
RSocketFactory.receive()
270279
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
271-
.transport(serverTransport)
280+
.transport(serverTransportSupplier.apply(address))
272281
.start()
273282
.block();
274283

275-
this.client = RSocketFactory.connect().transport(clientTransport).start().block();
284+
client =
285+
RSocketFactory.connect()
286+
.transport(clientTransportSupplier.apply(address, server))
287+
.start()
288+
.doOnError(Throwable::printStackTrace)
289+
.block();
276290
}
277291

278292
@Override

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ void connect() {
3333

3434
serverTransport
3535
.start(duplexConnection -> Mono.empty())
36-
.as(StepVerifier::create)
37-
.expectNextCount(1)
38-
.verifyComplete();
39-
40-
LocalClientTransport.create(serverTransport.getName())
41-
.connect()
36+
.flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect())
4237
.as(StepVerifier::create)
4338
.expectNextCount(1)
4439
.verifyComplete();

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
import io.rsocket.test.TransportTest;
2020
import java.time.Duration;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
final class LocalTransportTest implements TransportTest {
2324

24-
private final LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
25+
private static final AtomicInteger UNIQUE_NAME_GENERATOR = new AtomicInteger();
2526

26-
private final LocalClientTransport clientTransport = serverTransport.clientTransport();
27-
28-
private final TransportPair transportPair = new TransportPair(serverTransport, clientTransport);
27+
private final TransportPair transportPair =
28+
new TransportPair<>(
29+
() -> "test" + UNIQUE_NAME_GENERATOR.incrementAndGet(),
30+
(address, server) -> LocalClientTransport.create(address),
31+
LocalServerTransport::create);
2932

3033
@Override
3134
public Duration getTimeout() {

rsocket-transport-netty/build.gradle

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ dependencies {
2525
api project(':rsocket-core')
2626
api 'io.projectreactor.ipc:reactor-netty'
2727

28+
compileOnly 'com.google.code.findbugs:jsr305'
29+
2830
testImplementation project(':rsocket-test')
31+
testImplementation 'io.projectreactor:reactor-test'
32+
testImplementation 'org.assertj:assertj-core'
2933
testImplementation 'org.junit.jupiter:junit-jupiter-api'
3034

31-
// TODO: Remove after JUnit5 migration
32-
testCompileOnly 'junit:junit'
33-
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
35+
testRuntimeOnly 'ch.qos.logback:logback-classic'
36+
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
3437
}
3538

3639
description = 'Reactor Netty RSocket transport implementations (TCP, Websocket)'

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/RSocketLengthCodec.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2525

26-
public class RSocketLengthCodec extends LengthFieldBasedFrameDecoder {
26+
/**
27+
* An extension to the Netty {@link LengthFieldBasedFrameDecoder} that encapsulates the
28+
* RSocket-specific frame length header details.
29+
*/
30+
public final class RSocketLengthCodec extends LengthFieldBasedFrameDecoder {
31+
32+
/** Creates a new instance of the decoder, specifying the RSocket frame length header size. */
2733
public RSocketLengthCodec() {
2834
super(FRAME_LENGTH_MASK, 0, FRAME_LENGTH_SIZE, 0, 0);
2935
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java renamed to rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,44 +25,55 @@
2525
import reactor.ipc.netty.NettyInbound;
2626
import reactor.ipc.netty.NettyOutbound;
2727

28-
public class NettyDuplexConnection implements DuplexConnection {
28+
/** An implementation of {@link DuplexConnection} that connects via TCP. */
29+
public final class TcpDuplexConnection implements DuplexConnection {
30+
31+
private final NettyContext context;
32+
2933
private final NettyInbound in;
34+
3035
private final NettyOutbound out;
31-
private final NettyContext context;
3236

33-
public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
37+
/**
38+
* Creates a new instance
39+
*
40+
* @param in the {@link NettyInbound} to listen on
41+
* @param out the {@link NettyOutbound} to send with
42+
* @param context the {@link NettyContext} to for managing the server
43+
*/
44+
public TcpDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
3445
this.in = in;
3546
this.out = out;
3647
this.context = context;
3748
}
3849

3950
@Override
40-
public Mono<Void> send(Publisher<Frame> frames) {
41-
return Flux.from(frames).concatMap(this::sendOne).then();
51+
public void dispose() {
52+
context.dispose();
4253
}
4354

4455
@Override
45-
public Mono<Void> sendOne(Frame frame) {
46-
return out.sendObject(frame.content()).then();
56+
public boolean isDisposed() {
57+
return context.isDisposed();
4758
}
4859

4960
@Override
50-
public Flux<Frame> receive() {
51-
return in.receive().map(buf -> Frame.from(buf.retain()));
61+
public Mono<Void> onClose() {
62+
return context.onClose();
5263
}
5364

5465
@Override
55-
public void dispose() {
56-
context.dispose();
66+
public Flux<Frame> receive() {
67+
return in.receive().map(buf -> Frame.from(buf.retain()));
5768
}
5869

5970
@Override
60-
public boolean isDisposed() {
61-
return context.isDisposed();
71+
public Mono<Void> send(Publisher<Frame> frames) {
72+
return Flux.from(frames).concatMap(this::sendOne).then();
6273
}
6374

6475
@Override
65-
public Mono<Void> onClose() {
66-
return context.onClose();
76+
public Mono<Void> sendOne(Frame frame) {
77+
return out.sendObject(frame.content()).then();
6778
}
6879
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import java.net.URI;
20+
import java.util.Objects;
21+
22+
/** Utilities for dealing with with {@link URI}s */
23+
public final class UriUtils {
24+
25+
private UriUtils() {}
26+
27+
/**
28+
* Returns the port of a URI. If the port is unset (i.e. {@code -1}) then returns the {@code
29+
* defaultPort}.
30+
*
31+
* @param uri the URI to extract the port from
32+
* @param defaultPort the default to use if the port is unset
33+
* @return the port of a URI or {@code defaultPort} if unset
34+
* @throws NullPointerException if {@code uri} is {@code null}
35+
*/
36+
public static int getPort(URI uri, int defaultPort) {
37+
Objects.requireNonNull(uri, "uri must not be null");
38+
return uri.getPort() == -1 ? defaultPort : uri.getPort();
39+
}
40+
41+
/**
42+
* Returns whether the URI has a secure schema. Secure is defined as being either {@code wss} or
43+
* {@code https}.
44+
*
45+
* @param uri the URI to examine
46+
* @return whether the URI has a secure schema
47+
* @throws NullPointerException if {@code uri} is {@code null}
48+
*/
49+
public static boolean isSecure(URI uri) {
50+
Objects.requireNonNull(uri, "uri must not be null");
51+
return uri.getScheme().equals("wss") || uri.getScheme().equals("https");
52+
}
53+
}

0 commit comments

Comments
 (0)