Skip to content

Commit c019173

Browse files
committed
Release 1.0.0
2 parents f1c3198 + 8cb9e74 commit c019173

File tree

212 files changed

+5384
-9315
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+5384
-9315
lines changed

README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ repositories {
2626
mavenCentral()
2727
}
2828
dependencies {
29-
implementation 'io.rsocket:rsocket-core:1.0.0-RC6'
30-
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC6'
29+
implementation 'io.rsocket:rsocket-core:1.0.0-RC7'
30+
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC7'
3131
}
3232
```
3333

@@ -40,8 +40,8 @@ repositories {
4040
maven { url 'https://oss.jfrog.org/oss-snapshot-local' }
4141
}
4242
dependencies {
43-
implementation 'io.rsocket:rsocket-core:1.0.0-RC7-SNAPSHOT'
44-
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC7-SNAPSHOT'
43+
implementation 'io.rsocket:rsocket-core:1.0.0-RC8-SNAPSHOT'
44+
implementation 'io.rsocket:rsocket-transport-netty:1.0.0-RC8-SNAPSHOT'
4545
}
4646
```
4747

@@ -120,7 +120,6 @@ RSocket clientRSocket =
120120
// Enable Zero Copy
121121
.payloadDecoder(PayloadDecoder.ZERO_COPY)
122122
.connect(TcpClientTransport.create(7878))
123-
.start()
124123
.block();
125124
```
126125

benchmarks/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ task jmhBaseline(type: JmhExecTask, description: 'Executing JMH baseline benchma
4040
classpath = sourceSets.main.runtimeClasspath + configurations.baseline
4141
}
4242

43+
clean {
44+
delete "${projectDir}/src/main/generated"
45+
}
46+
4347
class JmhExecTask extends JavaExec {
4448

4549
private String include;
@@ -160,4 +164,4 @@ class JmhExecTask extends JavaExec {
160164

161165
super.exec();
162166
}
163-
}
167+
}

benchmarks/src/main/java/io/rsocket/RSocketPerf.java renamed to benchmarks/src/main/java/io/rsocket/core/RSocketPerf.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
package io.rsocket;
2-
1+
package io.rsocket.core;
2+
3+
import io.rsocket.AbstractRSocket;
4+
import io.rsocket.Closeable;
5+
import io.rsocket.Payload;
6+
import io.rsocket.PayloadsMaxPerfSubscriber;
7+
import io.rsocket.PayloadsPerfSubscriber;
8+
import io.rsocket.RSocket;
39
import io.rsocket.frame.decoder.PayloadDecoder;
410
import io.rsocket.transport.local.LocalClientTransport;
511
import io.rsocket.transport.local.LocalServerTransport;
@@ -59,9 +65,7 @@ public void awaitToBeConsumed() {
5965
@Setup
6066
public void setUp() throws NoSuchFieldException, IllegalAccessException {
6167
server =
62-
RSocketFactory.receive()
63-
.frameDecoder(PayloadDecoder.ZERO_COPY)
64-
.acceptor(
68+
RSocketServer.create(
6569
(setup, sendingSocket) ->
6670
Mono.just(
6771
new AbstractRSocket() {
@@ -89,16 +93,14 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
8993
return Flux.from(payloads);
9094
}
9195
}))
92-
.transport(LocalServerTransport.create("server"))
93-
.start()
96+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
97+
.bind(LocalServerTransport.create("server"))
9498
.block();
9599

96100
client =
97-
RSocketFactory.connect()
98-
.singleSubscriberRequester()
99-
.frameDecoder(PayloadDecoder.ZERO_COPY)
100-
.transport(LocalClientTransport.create("server"))
101-
.start()
101+
RSocketConnector.create()
102+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
103+
.connect(LocalClientTransport.create("server"))
102104
.block();
103105

104106
Field sendProcessorField = RSocketRequester.class.getDeclaredField("sendProcessor");

benchmarks/src/main/java/io/rsocket/StreamIdSupplierPerf.java renamed to benchmarks/src/main/java/io/rsocket/core/StreamIdSupplierPerf.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1-
package io.rsocket;
1+
package io.rsocket.core;
22

33
import io.netty.util.collection.IntObjectMap;
44
import io.rsocket.internal.SynchronizedIntObjectHashMap;
5-
import org.openjdk.jmh.annotations.*;
5+
import org.openjdk.jmh.annotations.Benchmark;
6+
import org.openjdk.jmh.annotations.BenchmarkMode;
7+
import org.openjdk.jmh.annotations.Fork;
8+
import org.openjdk.jmh.annotations.Measurement;
9+
import org.openjdk.jmh.annotations.Mode;
10+
import org.openjdk.jmh.annotations.Scope;
11+
import org.openjdk.jmh.annotations.Setup;
12+
import org.openjdk.jmh.annotations.State;
13+
import org.openjdk.jmh.annotations.Warmup;
614
import org.openjdk.jmh.infra.Blackhole;
715

816
@BenchmarkMode(Mode.Throughput)

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ subprojects {
6262

6363
dependencies {
6464
dependency "ch.qos.logback:logback-classic:${ext['logback.version']}"
65-
dependency "com.google.code.findbugs:jsr305:${ext['findbugs.version']}"
6665
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
6766
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
6867
dependency "org.assertj:assertj-core:${ext['assertj.version']}"

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC7
15-
perfBaselineVersion=1.0.0-RC6
14+
version=1.0.0
15+
perfBaselineVersion=1.0.0-RC7

rsocket-core/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ dependencies {
2929

3030
implementation 'org.slf4j:slf4j-api'
3131

32-
compileOnly 'com.google.code.findbugs:jsr305'
33-
3432
testImplementation 'io.projectreactor:reactor-test'
3533
testImplementation 'org.assertj:assertj-core'
3634
testImplementation 'org.junit.jupiter:junit-jupiter-api'

rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,48 +16,21 @@
1616

1717
package io.rsocket;
1818

19-
import org.reactivestreams.Publisher;
20-
import reactor.core.publisher.Flux;
2119
import reactor.core.publisher.Mono;
2220
import reactor.core.publisher.MonoProcessor;
2321

2422
/**
2523
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
2624
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
25+
*
26+
* @deprecated as of 1.0 in favor of implementing {@link RSocket} directly which has default
27+
* methods.
2728
*/
29+
@Deprecated
2830
public abstract class AbstractRSocket implements RSocket {
2931

3032
private final MonoProcessor<Void> onClose = MonoProcessor.create();
3133

32-
@Override
33-
public Mono<Void> fireAndForget(Payload payload) {
34-
payload.release();
35-
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
36-
}
37-
38-
@Override
39-
public Mono<Payload> requestResponse(Payload payload) {
40-
payload.release();
41-
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
42-
}
43-
44-
@Override
45-
public Flux<Payload> requestStream(Payload payload) {
46-
payload.release();
47-
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
48-
}
49-
50-
@Override
51-
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
52-
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
53-
}
54-
55-
@Override
56-
public Mono<Void> metadataPush(Payload payload) {
57-
payload.release();
58-
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
59-
}
60-
6134
@Override
6235
public void dispose() {
6336
onClose.onComplete();

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.util.AbstractReferenceCounted;
2121
import io.rsocket.core.DefaultConnectionSetupPayload;
22-
import javax.annotation.Nullable;
22+
import reactor.util.annotation.Nullable;
2323

2424
/**
2525
* Exposes information from the {@code SETUP} frame to a server, as well as to client responders.

rsocket-core/src/main/java/io/rsocket/RSocket.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public interface RSocket extends Availability, Closeable {
3333
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
3434
* handled, otherwise errors.
3535
*/
36-
Mono<Void> fireAndForget(Payload payload);
36+
default Mono<Void> fireAndForget(Payload payload) {
37+
payload.release();
38+
return Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented."));
39+
}
3740

3841
/**
3942
* Request-Response interaction model of {@code RSocket}.
@@ -42,23 +45,31 @@ public interface RSocket extends Availability, Closeable {
4245
* @return {@code Publisher} containing at most a single {@code Payload} representing the
4346
* response.
4447
*/
45-
Mono<Payload> requestResponse(Payload payload);
48+
default Mono<Payload> requestResponse(Payload payload) {
49+
payload.release();
50+
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
51+
}
4652

4753
/**
4854
* Request-Stream interaction model of {@code RSocket}.
4955
*
5056
* @param payload Request payload.
5157
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
5258
*/
53-
Flux<Payload> requestStream(Payload payload);
59+
default Flux<Payload> requestStream(Payload payload) {
60+
payload.release();
61+
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
62+
}
5463

5564
/**
5665
* Request-Channel interaction model of {@code RSocket}.
5766
*
5867
* @param payloads Stream of request payloads.
5968
* @return Stream of response payloads.
6069
*/
61-
Flux<Payload> requestChannel(Publisher<Payload> payloads);
70+
default Flux<Payload> requestChannel(Publisher<Payload> payloads) {
71+
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
72+
}
6273

6374
/**
6475
* Metadata-Push interaction model of {@code RSocket}.
@@ -67,10 +78,26 @@ public interface RSocket extends Availability, Closeable {
6778
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
6879
* handled, otherwise errors.
6980
*/
70-
Mono<Void> metadataPush(Payload payload);
81+
default Mono<Void> metadataPush(Payload payload) {
82+
payload.release();
83+
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
84+
}
7185

7286
@Override
7387
default double availability() {
7488
return isDisposed() ? 0.0 : 1.0;
7589
}
90+
91+
@Override
92+
default void dispose() {}
93+
94+
@Override
95+
default boolean isDisposed() {
96+
return false;
97+
}
98+
99+
@Override
100+
default Mono<Void> onClose() {
101+
return Mono.never();
102+
}
76103
}

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
111111
private Resume resume;
112112

113113
public ClientRSocketFactory() {
114-
this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace));
114+
this(RSocketConnector.create());
115115
}
116116

117117
public ClientRSocketFactory(RSocketConnector connector) {
@@ -393,9 +393,13 @@ public ClientRSocketFactory fragment(int mtu) {
393393
return this;
394394
}
395395

396-
/** @deprecated this is deprecated with no replacement. */
396+
/**
397+
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
398+
* order to observe errors, it is recommended to add error handler using {@code doOnError}
399+
* on the specific logical stream. In order to observe connection, or RSocket terminal
400+
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
401+
*/
397402
public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
398-
connector.errorConsumer(errorConsumer);
399403
return this;
400404
}
401405

@@ -417,7 +421,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor {
417421
private Resume resume;
418422

419423
public ServerRSocketFactory() {
420-
this(RSocketServer.create().errorConsumer(Throwable::printStackTrace));
424+
this(RSocketServer.create());
421425
}
422426

423427
public ServerRSocketFactory(RSocketServer server) {
@@ -497,9 +501,13 @@ public ServerRSocketFactory fragment(int mtu) {
497501
return this;
498502
}
499503

500-
/** @deprecated this is deprecated with no replacement. */
504+
/**
505+
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
506+
* order to observe errors, it is recommended to add error handler using {@code doOnError}
507+
* on the specific logical stream. In order to observe connection, or RSocket terminal
508+
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
509+
*/
501510
public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
502-
server.errorConsumer(errorConsumer);
503511
return this;
504512
}
505513

rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package io.rsocket;
22

3+
import java.util.function.BiFunction;
34
import org.reactivestreams.Publisher;
45
import reactor.core.publisher.Flux;
56

67
/**
78
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
89
* channel.
10+
*
11+
* @deprecated as of 1.0 RC7 in favor of using {@link RSocket#requestChannel(Publisher)} with {@link
12+
* Flux#switchOnFirst(BiFunction)}
913
*/
14+
@Deprecated
1015
public interface ResponderRSocket extends RSocket {
1116
/**
1217
* Implement this method to peak at the first payload of the incoming request stream without

0 commit comments

Comments
 (0)