Skip to content

Commit 1aecebe

Browse files
nebhaleyschimke
authored andcommitted
Fragmentation (#481)
* Alternate Framing Abstraction This change introduces a framing package that contains strongly typed representations of all of the RSocket frame types. These representations are introduced as a separate abstraction because they specifically remove the frame length and stream id that are an optional, transport-layer concern. This allows will allow the main body of the code to be gradually moved over to an implementation that only deals with the logical frame types, and pushes the decoration of the frames with frame length and/or stream id down to the transport layer. Note that no implementation has been converted over; this is just the prework to have an the frame types ready for migration. These frame type representations are all implemented using the Flyweight pattern (i.e. use the Netty Recycler) and preserve the zero-copy nature of the existing frame representation. These implementations have consistent and symmetric management of ByteBuf retain and release lifecycle, in addition to well documented and proper nullability enforced APIs, specific to each frame type. Each implementation is also tested against raw binary representations to ensure the strictest specification compliance. Finally, improvements were made to testing with regards to configurable test logging, an AssertJ ByteBuf Representation for visual comparison on test failure, and other conveniences. * Improvements to Fragmentation and Reassembly This change improves the implementation and test coverage of the fragmentation and reassembly functionality. The fragmentation functionality now support all frame types (previously, only the PAYLOAD frame could be fragmented) and ensures that a fragmentation and reassembly roundtrip results in an identical frame. The implementations of the fragmentation and reassembly functionality has been implemented on top of the new framing abstraction for testability and to start moving off of the abstraction that leaks frame lengths and stream ids. Note that the implementation of DuplexConnection *does not* move to the new framing abstraction. As this interface is not yet migrated, the FragmentationDuplexConnection is responsible for translating back and forth between the old abstraction and the new one. This has an implication that overhead will increase (although minimally because of zero-copy data and the use of Flyweights) temporarily during the migration to the new abstraction. Once that abstraction has been adopted everywhere, the translation will be removed, and the overheard will go back to it's current levels.
1 parent d7fadaf commit 1aecebe

File tree

118 files changed

+10620
-736
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+10620
-736
lines changed

build.gradle

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ subprojects {
4040
}
4141

4242
dependencies {
43+
dependency 'ch.qos.logback:logback-classic:1.2.3'
4344
dependency 'com.google.code.findbugs:jsr305:3.0.2'
4445
dependency 'com.netflix.spectator:spectator-api:0.63.0'
4546
dependency 'io.netty:netty-buffer:4.1.21.Final'
@@ -49,6 +50,7 @@ subprojects {
4950
dependency 'org.jctools:jctools-core:2.1.2'
5051
dependency 'org.mockito:mockito-core:2.16.0'
5152
dependency 'org.openjdk.jmh:jmh-core:1.20'
53+
dependency 'org.slf4j:slf4j-api:1.7.25'
5254

5355
dependencySet(group: 'org.junit.jupiter', version: '5.1.0') {
5456
entry 'junit-jupiter-api'
@@ -66,11 +68,6 @@ subprojects {
6668
entry 'jmh-core'
6769
entry 'jmh-generator-annprocess'
6870
}
69-
70-
dependencySet(group: 'org.slf4j', version: '1.7.25') {
71-
entry 'slf4j-api'
72-
entry 'slf4j-nop'
73-
}
7471
}
7572
}
7673

@@ -86,6 +83,14 @@ subprojects {
8683
options.compilerArgs << '-Xlint:all,-overloads,-rawtypes,-unchecked'
8784
}
8885

86+
javadoc {
87+
options.with {
88+
links 'https://docs.oracle.com/javase/8/docs/api/'
89+
links 'https://projectreactor.io/docs/core/release/api/'
90+
links 'https://netty.io/4.1/api/'
91+
}
92+
}
93+
8994
test {
9095
useJUnitPlatform()
9196
}

rsocket-core/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ plugins {
2727
dependencies {
2828
api 'io.netty:netty-buffer'
2929
api 'io.projectreactor:reactor-core'
30+
api 'io.projectreactor.addons:reactor-extra'
3031

3132
implementation 'com.google.code.findbugs:jsr305'
3233
implementation 'org.jctools:jctools-core'
@@ -37,6 +38,7 @@ dependencies {
3738
testImplementation 'org.junit.jupiter:junit-jupiter-api'
3839
testImplementation 'org.mockito:mockito-core'
3940

41+
testRuntimeOnly 'ch.qos.logback:logback-classic'
4042
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
4143

4244
// TODO: Remove after JUnit5 migration

rsocket-core/jmh.gradle

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616

1717
dependencies {
1818
jmh configurations.api
19+
jmh configurations.implementation
1920
jmh 'org.openjdk.jmh:jmh-core'
2021
jmh 'org.openjdk.jmh:jmh-generator-annprocess'
21-
22-
jmhRuntime 'org.openjdk.jmh:jmh-core'
23-
jmhRuntime 'org.slf4j:slf4j-nop'
2422
}
2523

2624
jmhCompileGeneratedClasses.enabled = false
@@ -36,7 +34,6 @@ jmh {
3634

3735
jmhJar {
3836
from project.configurations.jmh
39-
from project.configurations.jmhRuntime
4037
}
4138

4239
tasks.jmh.finalizedBy tasks.jmhReport

rsocket-core/src/jmh/java/io/rsocket/FragmentationPerf.java

Lines changed: 0 additions & 112 deletions
This file was deleted.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.fragmentation;
18+
19+
import static io.netty.buffer.UnpooledByteBufAllocator.DEFAULT;
20+
import static io.rsocket.framing.RequestResponseFrame.createRequestResponseFrame;
21+
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.ByteBufAllocator;
24+
import io.netty.buffer.Unpooled;
25+
import io.rsocket.framing.Frame;
26+
import java.util.List;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import org.openjdk.jmh.annotations.Benchmark;
29+
import org.openjdk.jmh.annotations.BenchmarkMode;
30+
import org.openjdk.jmh.annotations.Fork;
31+
import org.openjdk.jmh.annotations.Measurement;
32+
import org.openjdk.jmh.annotations.Mode;
33+
import org.openjdk.jmh.annotations.Scope;
34+
import org.openjdk.jmh.annotations.Setup;
35+
import org.openjdk.jmh.annotations.State;
36+
import org.openjdk.jmh.annotations.Warmup;
37+
import org.openjdk.jmh.infra.Blackhole;
38+
import reactor.core.publisher.SynchronousSink;
39+
import reactor.util.context.Context;
40+
41+
@BenchmarkMode(Mode.Throughput)
42+
@Fork(
43+
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
44+
)
45+
@Warmup(iterations = 10)
46+
@Measurement(iterations = 10_000)
47+
@State(Scope.Thread)
48+
public class FragmentationPerformanceTest {
49+
50+
@Benchmark
51+
public void largeFragmentation(Input input) {
52+
Frame frame =
53+
input.largeFragmenter.fragment(input.largeFrame).doOnNext(Frame::dispose).blockLast();
54+
55+
input.bh.consume(frame);
56+
}
57+
58+
@Benchmark
59+
public void largeReassembly(Input input) {
60+
input.largeFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
61+
62+
input.bh.consume(input.sink.next);
63+
}
64+
65+
@Benchmark
66+
public void smallFragmentation(Input input) {
67+
Frame frame =
68+
input.smallFragmenter.fragment(input.smallFrame).doOnNext(Frame::dispose).blockLast();
69+
70+
input.bh.consume(frame);
71+
}
72+
73+
@Benchmark
74+
public void smallReassembly(Input input) {
75+
input.smallFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
76+
77+
input.bh.consume(input.sink.next);
78+
}
79+
80+
@State(Scope.Benchmark)
81+
public static class Input {
82+
83+
Blackhole bh;
84+
85+
FrameFragmenter largeFragmenter;
86+
87+
Frame largeFrame;
88+
89+
List<Frame> largeFrames;
90+
91+
FrameReassembler reassembler = FrameReassembler.createFrameReassembler(DEFAULT);
92+
93+
MockSynchronousSink<Frame> sink;
94+
95+
FrameFragmenter smallFragmenter;
96+
97+
Frame smallFrame;
98+
99+
List<Frame> smallFrames;
100+
101+
@Setup
102+
public void setup(Blackhole bh) {
103+
this.bh = bh;
104+
105+
sink = new MockSynchronousSink<>();
106+
107+
largeFrame =
108+
createRequestResponseFrame(
109+
DEFAULT, false, getRandomByteBuf(1 << 18), getRandomByteBuf(1 << 18));
110+
111+
smallFrame =
112+
createRequestResponseFrame(DEFAULT, false, getRandomByteBuf(16), getRandomByteBuf(16));
113+
114+
largeFragmenter = new FrameFragmenter(DEFAULT, 1024);
115+
smallFragmenter = new FrameFragmenter(ByteBufAllocator.DEFAULT, 2);
116+
117+
largeFrames = largeFragmenter.fragment(largeFrame).collectList().block();
118+
smallFrames = smallFragmenter.fragment(smallFrame).collectList().block();
119+
}
120+
121+
private static ByteBuf getRandomByteBuf(int size) {
122+
byte[] bytes = new byte[size];
123+
ThreadLocalRandom.current().nextBytes(bytes);
124+
return Unpooled.wrappedBuffer(bytes);
125+
}
126+
}
127+
128+
static final class MockSynchronousSink<T> implements SynchronousSink<T> {
129+
130+
Throwable error;
131+
132+
T next;
133+
134+
@Override
135+
public void complete() {}
136+
137+
@Override
138+
public Context currentContext() {
139+
return null;
140+
}
141+
142+
@Override
143+
public void error(Throwable e) {
144+
this.error = e;
145+
}
146+
147+
@Override
148+
public void next(T t) {
149+
this.next = t;
150+
}
151+
}
152+
}

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.util.AbstractReferenceCounted;
2424
import io.rsocket.Frame.Setup;
2525
import io.rsocket.frame.SetupFrameFlyweight;
26+
import io.rsocket.framing.FrameType;
2627

2728
/**
2829
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ public interface DuplexConnection extends Availability, Closeable {
3333
*
3434
* The passed {@code Publisher} must
3535
*
36-
* @param frame Stream of {@code Frame}s to send on the connection.
36+
* @param frames Stream of {@code Frame}s to send on the connection.
3737
* @return {@code Publisher} that completes when all the frames are written on the connection
3838
* successfully and errors when it fails.
39+
* @throws NullPointerException if {@code frames} is {@code null}
3940
*/
40-
Mono<Void> send(Publisher<Frame> frame);
41+
Mono<Void> send(Publisher<Frame> frames);
4142

4243
/**
4344
* Sends a single {@code Frame} on this connection and returns the {@code Publisher} representing

0 commit comments

Comments
 (0)