Skip to content

Commit ab570b4

Browse files
committed
Merge branch 'release/0.12.2-RC4'
2 parents fe3b390 + 7a5f9ff commit ab570b4

33 files changed

+2195
-179
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Learn more at http://rsocket.io
1515

1616
## Build and Binaries
1717

18-
[![Build Status](https://travis-ci.org/rsocket/rsocket-java.svg?branch=1.0.x)](https://travis-ci.org/rsocket/rsocket-java)
18+
[![Build Status](https://travis-ci.org/rsocket/rsocket-java.svg?branch=develop)](https://travis-ci.org/rsocket/rsocket-java)
1919

2020
Releases are available via Maven Central.
2121

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=0.12.2-RC3
14+
version=0.12.2-RC4
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.rsocket.metadata;
2+
3+
import org.openjdk.jmh.annotations.*;
4+
import org.openjdk.jmh.infra.Blackhole;
5+
6+
@BenchmarkMode(Mode.Throughput)
7+
@Fork(value = 1)
8+
@Warmup(iterations = 10)
9+
@Measurement(iterations = 10)
10+
@State(Scope.Thread)
11+
public class WellKnownMimeTypePerf {
12+
13+
// this is the old values() looping implementation of fromIdentifier
14+
private WellKnownMimeType fromIdValuesLoop(int id) {
15+
if (id < 0 || id > 127) {
16+
return WellKnownMimeType.UNPARSEABLE_MIME_TYPE;
17+
}
18+
for (WellKnownMimeType value : WellKnownMimeType.values()) {
19+
if (value.getIdentifier() == id) {
20+
return value;
21+
}
22+
}
23+
return WellKnownMimeType.UNKNOWN_RESERVED_MIME_TYPE;
24+
}
25+
26+
// this is the core of the old values() looping implementation of fromString
27+
private WellKnownMimeType fromStringValuesLoop(String mimeType) {
28+
for (WellKnownMimeType value : WellKnownMimeType.values()) {
29+
if (mimeType.equals(value.getString())) {
30+
return value;
31+
}
32+
}
33+
return WellKnownMimeType.UNPARSEABLE_MIME_TYPE;
34+
}
35+
36+
@Benchmark
37+
public void fromIdArrayLookup(final Blackhole bh) {
38+
// negative lookup
39+
bh.consume(WellKnownMimeType.fromIdentifier(-10));
40+
bh.consume(WellKnownMimeType.fromIdentifier(-1));
41+
// too large lookup
42+
bh.consume(WellKnownMimeType.fromIdentifier(129));
43+
// first lookup
44+
bh.consume(WellKnownMimeType.fromIdentifier(0));
45+
// middle lookup
46+
bh.consume(WellKnownMimeType.fromIdentifier(37));
47+
// reserved lookup
48+
bh.consume(WellKnownMimeType.fromIdentifier(63));
49+
// last lookup
50+
bh.consume(WellKnownMimeType.fromIdentifier(127));
51+
}
52+
53+
@Benchmark
54+
public void fromIdValuesLoopLookup(final Blackhole bh) {
55+
// negative lookup
56+
bh.consume(fromIdValuesLoop(-10));
57+
bh.consume(fromIdValuesLoop(-1));
58+
// too large lookup
59+
bh.consume(fromIdValuesLoop(129));
60+
// first lookup
61+
bh.consume(fromIdValuesLoop(0));
62+
// middle lookup
63+
bh.consume(fromIdValuesLoop(37));
64+
// reserved lookup
65+
bh.consume(fromIdValuesLoop(63));
66+
// last lookup
67+
bh.consume(fromIdValuesLoop(127));
68+
}
69+
70+
@Benchmark
71+
public void fromStringMapLookup(final Blackhole bh) {
72+
// unknown lookup
73+
bh.consume(WellKnownMimeType.fromString("foo/bar"));
74+
// first lookup
75+
bh.consume(WellKnownMimeType.fromString(WellKnownMimeType.APPLICATION_AVRO.getString()));
76+
// middle lookup
77+
bh.consume(WellKnownMimeType.fromString(WellKnownMimeType.VIDEO_VP8.getString()));
78+
// last lookup
79+
bh.consume(
80+
WellKnownMimeType.fromString(
81+
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
82+
}
83+
84+
@Benchmark
85+
public void fromStringValuesLoopLookup(final Blackhole bh) {
86+
// unknown lookup
87+
bh.consume(fromStringValuesLoop("foo/bar"));
88+
// first lookup
89+
bh.consume(fromStringValuesLoop(WellKnownMimeType.APPLICATION_AVRO.getString()));
90+
// middle lookup
91+
bh.consume(fromStringValuesLoop(WellKnownMimeType.VIDEO_VP8.getString()));
92+
// last lookup
93+
bh.consume(
94+
fromStringValuesLoop(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()));
95+
}
96+
}

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,25 @@ public ClientRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
134134
plugins.addConnectionPlugin(interceptor);
135135
return this;
136136
}
137-
137+
/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
138+
@Deprecated
138139
public ClientRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
139-
plugins.addClientPlugin(interceptor);
140+
return addRequesterPlugin(interceptor);
141+
}
142+
143+
public ClientRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
144+
plugins.addRequesterPlugin(interceptor);
140145
return this;
141146
}
142147

148+
/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
149+
@Deprecated
143150
public ClientRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
144-
plugins.addServerPlugin(interceptor);
151+
return addResponderPlugin(interceptor);
152+
}
153+
154+
public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
155+
plugins.addResponderPlugin(interceptor);
145156
return this;
146157
}
147158

@@ -291,8 +302,8 @@ public Mono<RSocket> start() {
291302
ClientServerInputMultiplexer multiplexer =
292303
new ClientServerInputMultiplexer(wrappedConnection, plugins);
293304

294-
RSocketClient rSocketClient =
295-
new RSocketClient(
305+
RSocketRequester rSocketRequester =
306+
new RSocketRequester(
296307
allocator,
297308
multiplexer.asClientConnection(),
298309
payloadDecoder,
@@ -314,27 +325,27 @@ public Mono<RSocket> start() {
314325
setupPayload.sliceMetadata(),
315326
setupPayload.sliceData());
316327

317-
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
328+
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
318329

319-
RSocket unwrappedServerSocket;
330+
RSocket rSocketHandler;
320331
if (biAcceptor != null) {
321332
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
322-
unwrappedServerSocket = biAcceptor.apply(setup, wrappedRSocketClient);
333+
rSocketHandler = biAcceptor.apply(setup, wrappedRSocketRequester);
323334
} else {
324-
unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);
335+
rSocketHandler = acceptor.get().apply(wrappedRSocketRequester);
325336
}
326337

327-
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
338+
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
328339

329-
RSocketServer rSocketServer =
330-
new RSocketServer(
340+
RSocketResponder rSocketResponder =
341+
new RSocketResponder(
331342
allocator,
332343
multiplexer.asServerConnection(),
333-
wrappedRSocketServer,
344+
wrappedRSocketHandler,
334345
payloadDecoder,
335346
errorConsumer);
336347

337-
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
348+
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester);
338349
});
339350
}
340351

@@ -397,14 +408,25 @@ public ServerRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
397408
plugins.addConnectionPlugin(interceptor);
398409
return this;
399410
}
400-
411+
/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
412+
@Deprecated
401413
public ServerRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
402-
plugins.addClientPlugin(interceptor);
414+
return addRequesterPlugin(interceptor);
415+
}
416+
417+
public ServerRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
418+
plugins.addRequesterPlugin(interceptor);
403419
return this;
404420
}
405421

422+
/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
423+
@Deprecated
406424
public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
407-
plugins.addServerPlugin(interceptor);
425+
return addResponderPlugin(interceptor);
426+
}
427+
428+
public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
429+
plugins.addResponderPlugin(interceptor);
408430
return this;
409431
}
410432

@@ -525,29 +547,29 @@ private Mono<Void> acceptSetup(
525547
(keepAliveHandler, wrappedMultiplexer) -> {
526548
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
527549

528-
RSocketClient rSocketClient =
529-
new RSocketClient(
550+
RSocketRequester rSocketRequester =
551+
new RSocketRequester(
530552
allocator,
531553
wrappedMultiplexer.asServerConnection(),
532554
payloadDecoder,
533555
errorConsumer,
534556
StreamIdSupplier.serverSupplier());
535557

536-
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
558+
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
537559

538560
return acceptor
539-
.accept(setupPayload, wrappedRSocketClient)
561+
.accept(setupPayload, wrappedRSocketRequester)
540562
.onErrorResume(
541563
err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err)))
542564
.doOnNext(
543-
unwrappedServerSocket -> {
544-
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
565+
rSocketHandler -> {
566+
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
545567

546-
RSocketServer rSocketServer =
547-
new RSocketServer(
568+
RSocketResponder rSocketResponder =
569+
new RSocketResponder(
548570
allocator,
549571
wrappedMultiplexer.asClientConnection(),
550-
wrappedRSocketServer,
572+
wrappedRSocketHandler,
551573
payloadDecoder,
552574
errorConsumer,
553575
setupPayload.keepAliveInterval(),

rsocket-core/src/main/java/io/rsocket/RSocketClient.java renamed to rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@
4545
import org.reactivestreams.Subscriber;
4646
import reactor.core.publisher.*;
4747

48-
/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
49-
class RSocketClient implements RSocket {
48+
/**
49+
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
50+
*/
51+
class RSocketRequester implements RSocket {
5052

5153
private final DuplexConnection connection;
5254
private final PayloadDecoder payloadDecoder;
@@ -60,7 +62,7 @@ class RSocketClient implements RSocket {
6062
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
6163

6264
/*client requester*/
63-
RSocketClient(
65+
RSocketRequester(
6466
ByteBufAllocator allocator,
6567
DuplexConnection connection,
6668
PayloadDecoder payloadDecoder,
@@ -99,7 +101,7 @@ class RSocketClient implements RSocket {
99101
}
100102

101103
/*server requester*/
102-
RSocketClient(
104+
RSocketRequester(
103105
ByteBufAllocator allocator,
104106
DuplexConnection connection,
105107
PayloadDecoder payloadDecoder,

rsocket-core/src/main/java/io/rsocket/RSocketServer.java renamed to rsocket-core/src/main/java/io/rsocket/RSocketResponder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import reactor.core.Exceptions;
4444
import reactor.core.publisher.*;
4545

46-
/** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */
47-
class RSocketServer implements ResponderRSocket {
46+
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
47+
class RSocketResponder implements ResponderRSocket {
4848

4949
private final DuplexConnection connection;
5050
private final RSocket requestHandler;
@@ -61,7 +61,7 @@ class RSocketServer implements ResponderRSocket {
6161
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
6262

6363
/*client responder*/
64-
RSocketServer(
64+
RSocketResponder(
6565
ByteBufAllocator allocator,
6666
DuplexConnection connection,
6767
RSocket requestHandler,
@@ -71,7 +71,7 @@ class RSocketServer implements ResponderRSocket {
7171
}
7272

7373
/*server responder*/
74-
RSocketServer(
74+
RSocketResponder(
7575
ByteBufAllocator allocator,
7676
DuplexConnection connection,
7777
RSocket requestHandler,

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.rsocket.frame.FrameLengthFlyweight;
2727
import io.rsocket.frame.FrameType;
2828
import java.util.Objects;
29+
import javax.annotation.Nullable;
2930
import org.reactivestreams.Publisher;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -57,13 +58,10 @@ public FragmentationDuplexConnection(
5758
String type) {
5859
Objects.requireNonNull(delegate, "delegate must not be null");
5960
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
60-
if (mtu < MIN_MTU_SIZE) {
61-
throw new IllegalArgumentException("smallest allowed mtu size is " + MIN_MTU_SIZE + " bytes");
62-
}
6361
this.encodeLength = encodeLength;
6462
this.allocator = allocator;
6563
this.delegate = delegate;
66-
this.mtu = mtu;
64+
this.mtu = assertMtu(mtu);
6765
this.frameReassembler = new FrameReassembler(allocator);
6866
this.type = type;
6967

@@ -74,6 +72,32 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) {
7472
return frameType.isFragmentable() && readableBytes > mtu;
7573
}
7674

75+
/*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */
76+
@Nullable
77+
public static <T> Mono<T> checkMtu(int mtu) {
78+
if (isInsufficientMtu(mtu)) {
79+
String msg =
80+
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
81+
return Mono.error(new IllegalArgumentException(msg));
82+
} else {
83+
return null;
84+
}
85+
}
86+
87+
private static int assertMtu(int mtu) {
88+
if (isInsufficientMtu(mtu)) {
89+
String msg =
90+
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
91+
throw new IllegalArgumentException(msg);
92+
} else {
93+
return mtu;
94+
}
95+
}
96+
97+
private static boolean isInsufficientMtu(int mtu) {
98+
return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0;
99+
}
100+
77101
@Override
78102
public Mono<Void> send(Publisher<ByteBuf> frames) {
79103
return Flux.from(frames).concatMap(this::sendOne).then();
@@ -89,13 +113,13 @@ public Mono<Void> sendOne(ByteBuf frame) {
89113
Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength))
90114
.doOnNext(
91115
byteBuf -> {
92-
ByteBuf frame1 = FrameLengthFlyweight.frame(byteBuf);
116+
ByteBuf f = encodeLength ? FrameLengthFlyweight.frame(byteBuf) : byteBuf;
93117
logger.debug(
94118
"{} - stream id {} - frame type {} - \n {}",
95119
type,
96-
FrameHeaderFlyweight.streamId(frame1),
97-
FrameHeaderFlyweight.frameType(frame1),
98-
ByteBufUtil.prettyHexDump(frame1));
120+
FrameHeaderFlyweight.streamId(f),
121+
FrameHeaderFlyweight.frameType(f),
122+
ByteBufUtil.prettyHexDump(f));
99123
}));
100124
} else {
101125
return delegate.send(
@@ -108,7 +132,7 @@ public Mono<Void> sendOne(ByteBuf frame) {
108132

109133
private ByteBuf encode(ByteBuf frame) {
110134
if (encodeLength) {
111-
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain();
135+
return FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame);
112136
} else {
113137
return frame;
114138
}

0 commit comments

Comments
 (0)