Skip to content

Commit 6b005ca

Browse files
committed
Merge branch 'release/1.0.0-RC2'
2 parents 32fd4d3 + cce337a commit 6b005ca

23 files changed

+307
-76
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
---
1717
language: java
1818

19+
dist: trusty
20+
1921
matrix:
2022
include:
2123
- jdk: oraclejdk8

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ subprojects {
2929
apply plugin: 'io.spring.dependency-management'
3030
apply plugin: 'com.github.sherter.google-java-format'
3131

32-
ext['reactor-bom.version'] = 'Californium-SR8'
32+
ext['reactor-bom.version'] = 'Dysprosium-M3'
3333
ext['logback.version'] = '1.2.3'
3434
ext['findbugs.version'] = '3.0.2'
35-
ext['netty.version'] = '4.1.36.Final'
35+
ext['netty.version'] = '4.1.37.Final'
3636
ext['netty-boringssl.version'] = '2.0.25.Final'
3737
ext['hdrhistogram.version'] = '2.1.10'
3838
ext['mockito.version'] = '2.25.1'
@@ -89,6 +89,7 @@ subprojects {
8989

9090
repositories {
9191
mavenCentral()
92+
maven { url 'http://repo.spring.io/milestone' } // temporary for Reactor Dysprosium
9293

9394
if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) {
9495
maven { url 'http://repo.spring.io/libs-snapshot' }

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC1
14+
version=1.0.0-RC2
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.rsocket;
2+
3+
import io.netty.util.collection.IntObjectMap;
4+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
5+
import org.openjdk.jmh.annotations.*;
6+
import org.openjdk.jmh.infra.Blackhole;
7+
8+
@BenchmarkMode(Mode.Throughput)
9+
@Fork(
10+
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
11+
)
12+
@Warmup(iterations = 10)
13+
@Measurement(iterations = 10)
14+
@State(Scope.Thread)
15+
public class StreamIdSupplierPerf {
16+
@Benchmark
17+
public void benchmarkStreamId(Input input) {
18+
int i = input.supplier.nextStreamId(input.map);
19+
input.bh.consume(i);
20+
}
21+
22+
@State(Scope.Benchmark)
23+
public static class Input {
24+
Blackhole bh;
25+
IntObjectMap map;
26+
StreamIdSupplier supplier;
27+
28+
@Setup
29+
public void setup(Blackhole bh) {
30+
this.supplier = StreamIdSupplier.clientSupplier();
31+
this.bh = bh;
32+
this.map = new SynchronizedIntObjectHashMap();
33+
}
34+
}
35+
}

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.util.AbstractReferenceCounted;
2121
import io.rsocket.frame.FrameHeaderFlyweight;
2222
import io.rsocket.frame.SetupFrameFlyweight;
23+
import javax.annotation.Nullable;
2324

2425
/**
2526
* Exposed to server for determination of ResponderRSocket based on mime types and SETUP
@@ -43,6 +44,11 @@ public static ConnectionSetupPayload create(final ByteBuf setupFrame) {
4344

4445
public abstract boolean willClientHonorLease();
4546

47+
public abstract boolean isResumeEnabled();
48+
49+
@Nullable
50+
public abstract ByteBuf resumeToken();
51+
4652
@Override
4753
public ConnectionSetupPayload retain() {
4854
super.retain();
@@ -101,6 +107,16 @@ public boolean willClientHonorLease() {
101107
return SetupFrameFlyweight.honorLease(setupFrame);
102108
}
103109

110+
@Override
111+
public boolean isResumeEnabled() {
112+
return SetupFrameFlyweight.resumeEnabled(setupFrame);
113+
}
114+
115+
@Override
116+
public ByteBuf resumeToken() {
117+
return SetupFrameFlyweight.resumeToken(setupFrame);
118+
}
119+
104120
@Override
105121
public ConnectionSetupPayload touch() {
106122
setupFrame.touch();

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,7 @@ public Mono<RSocket> start() {
355355
resumeToken,
356356
metadataMimeType,
357357
dataMimeType,
358-
setupPayload.sliceMetadata(),
359-
setupPayload.sliceData());
358+
setupPayload);
360359

361360
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
362361

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
205205
return Mono.error(err);
206206
}
207207

208-
final int streamId = streamIdSupplier.nextStreamId();
208+
final int streamId = streamIdSupplier.nextStreamId(receivers);
209209

210210
return emptyUnicastMono()
211211
.doOnSubscribe(
@@ -233,7 +233,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
233233
return Mono.error(err);
234234
}
235235

236-
int streamId = streamIdSupplier.nextStreamId();
236+
int streamId = streamIdSupplier.nextStreamId(receivers);
237237
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
238238

239239
UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
@@ -274,7 +274,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
274274
return Flux.error(err);
275275
}
276276

277-
int streamId = streamIdSupplier.nextStreamId();
277+
int streamId = streamIdSupplier.nextStreamId(receivers);
278278

279279
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
280280
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
@@ -328,7 +328,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
328328

329329
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
330330
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
331-
final int streamId = streamIdSupplier.nextStreamId();
331+
final int streamId = streamIdSupplier.nextStreamId(receivers);
332332

333333
return receiver
334334
.doOnRequest(

rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package io.rsocket;
1817

19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
18+
import io.netty.util.collection.IntObjectMap;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2020

2121
final class StreamIdSupplier {
22+
private static final int MASK = 0x7FFFFFFF;
2223

23-
private static final AtomicIntegerFieldUpdater<StreamIdSupplier> STREAM_ID =
24-
AtomicIntegerFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
25-
private volatile int streamId;
24+
private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
25+
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
26+
private volatile long streamId;
2627

27-
private StreamIdSupplier(int streamId) {
28+
// Visible for testing
29+
StreamIdSupplier(int streamId) {
2830
this.streamId = streamId;
2931
}
3032

@@ -36,8 +38,12 @@ static StreamIdSupplier serverSupplier() {
3638
return new StreamIdSupplier(0);
3739
}
3840

39-
int nextStreamId() {
40-
return STREAM_ID.addAndGet(this, 2);
41+
int nextStreamId(IntObjectMap<?> streamIds) {
42+
int streamId;
43+
do {
44+
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
45+
} while (streamId == 0 || streamIds.containsKey(streamId));
46+
return streamId;
4147
}
4248

4349
boolean isBeforeOrCurrent(int streamId) {

rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public ByteBuffer[] _nioBuffers(int index, int length) {
144144
new ByteBuffer[oneBuffer.length + twoBuffer.length + threeBuffer.length];
145145
System.arraycopy(oneBuffer, 0, results, 0, oneBuffer.length);
146146
System.arraycopy(twoBuffer, 0, results, oneBuffer.length, twoBuffer.length);
147-
System.arraycopy(threeBuffer, 0, results, twoBuffer.length, threeBuffer.length);
147+
System.arraycopy(
148+
threeBuffer, 0, results, oneBuffer.length + twoBuffer.length, threeBuffer.length);
148149
return results;
149150
} else {
150151
ByteBuffer[] results = new ByteBuffer[oneBuffer.length + twoBuffer.length];
@@ -167,7 +168,7 @@ public ByteBuffer[] _nioBuffers(int index, int length) {
167168
threeBuffer = three.nioBuffers(threeReadIndex, length);
168169
ByteBuffer[] results = new ByteBuffer[twoBuffer.length + threeBuffer.length];
169170
System.arraycopy(twoBuffer, 0, results, 0, twoBuffer.length);
170-
System.arraycopy(threeBuffer, 0, results, threeBuffer.length, twoBuffer.length);
171+
System.arraycopy(threeBuffer, 0, results, twoBuffer.length, threeBuffer.length);
171172
return results;
172173
} else {
173174
return twoBuffer;

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
* and Reassembly</a>
3939
*/
4040
final class FrameReassembler extends AtomicBoolean implements Disposable {
41+
42+
private static final long serialVersionUID = -4394598098863449055L;
43+
4144
private static final Logger logger = LoggerFactory.getLogger(FrameReassembler.class);
4245

4346
final IntObjectMap<ByteBuf> headers;

rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.netty.buffer.ByteBufAllocator;
55
import io.netty.buffer.ByteBufUtil;
66
import io.netty.buffer.Unpooled;
7+
import io.rsocket.Payload;
78
import java.nio.charset.StandardCharsets;
89

910
public class SetupFrameFlyweight {
@@ -27,13 +28,12 @@ public class SetupFrameFlyweight {
2728

2829
public static ByteBuf encode(
2930
final ByteBufAllocator allocator,
30-
boolean lease,
31+
final boolean lease,
3132
final int keepaliveInterval,
3233
final int maxLifetime,
3334
final String metadataMimeType,
3435
final String dataMimeType,
35-
final ByteBuf metadata,
36-
final ByteBuf data) {
36+
final Payload setupPayload) {
3737
return encode(
3838
allocator,
3939
lease,
@@ -42,20 +42,21 @@ public static ByteBuf encode(
4242
Unpooled.EMPTY_BUFFER,
4343
metadataMimeType,
4444
dataMimeType,
45-
metadata,
46-
data);
45+
setupPayload);
4746
}
4847

4948
public static ByteBuf encode(
5049
final ByteBufAllocator allocator,
51-
boolean lease,
50+
final boolean lease,
5251
final int keepaliveInterval,
5352
final int maxLifetime,
5453
final ByteBuf resumeToken,
5554
final String metadataMimeType,
5655
final String dataMimeType,
57-
final ByteBuf metadata,
58-
final ByteBuf data) {
56+
final Payload setupPayload) {
57+
58+
ByteBuf metadata = setupPayload.hasMetadata() ? setupPayload.sliceMetadata() : null;
59+
ByteBuf data = setupPayload.sliceData();
5960

6061
int flags = 0;
6162

rsocket-core/src/main/java/io/rsocket/internal/SynchronizedIntObjectHashMap.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public synchronized boolean isEmpty() {
189189

190190
@Override
191191
public synchronized void clear() {
192-
Arrays.fill(keys, (int) 0);
192+
Arrays.fill(keys, 0);
193193
Arrays.fill(values, null);
194194
size = 0;
195195
}
@@ -331,7 +331,7 @@ public synchronized Set<Entry<Integer, V>> entrySet() {
331331
}
332332

333333
private int objectToKey(Object key) {
334-
return (int) ((Integer) key).intValue();
334+
return ((Integer) key).intValue();
335335
}
336336

337337
/**
@@ -369,7 +369,7 @@ private int hashIndex(int key) {
369369

370370
/** Returns the hash code for the key. */
371371
private static int hashCode(int key) {
372-
return (int) key;
372+
return key;
373373
}
374374

375375
/** Get the next sequential index after {@code index} and wraps if necessary. */

rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.Objects;
44
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
5-
import java.util.function.LongSupplier;
65
import java.util.stream.Stream;
76
import org.reactivestreams.Processor;
87
import org.reactivestreams.Subscription;
@@ -17,12 +16,7 @@
1716
import reactor.util.function.Tuple2;
1817

1918
public class UnicastMonoProcessor<O> extends Mono<O>
20-
implements Processor<O, O>,
21-
CoreSubscriber<O>,
22-
Disposable,
23-
Subscription,
24-
Scannable,
25-
LongSupplier {
19+
implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {
2620

2721
@SuppressWarnings("rawtypes")
2822
static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
@@ -87,11 +81,6 @@ public Stream<Tuple2<String, String>> tags() {
8781
return processor.tags();
8882
}
8983

90-
@Override
91-
public long getAsLong() {
92-
return processor.getAsLong();
93-
}
94-
9584
@Override
9685
public void onSubscribe(Subscription s) {
9786
processor.onSubscribe(s);

rsocket-core/src/main/java/io/rsocket/metadata/WellKnownMimeType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public enum WellKnownMimeType {
6767
VIDEO_H264("video/H264", (byte) 0x23),
6868
VIDEO_H265("video/H265", (byte) 0x24),
6969
VIDEO_VP8("video/VP8", (byte) 0x25),
70+
APPLICATION_HESSIAN("application/x-hessian", (byte) 0x26),
71+
APPLICATION_JAVA_OBJECT("application/x-java-object", (byte) 0x27),
72+
APPLICATION_CLOUDEVENTS_JSON("application/cloudevents+json", (byte) 0x28),
7073

7174
// ... reserved for future use ...
7275

0 commit comments

Comments
 (0)