Skip to content

Commit ec42c83

Browse files
Ryland Degnanrobertroeser
Ryland Degnan
authored andcommitted
Simplify RSocketClient/Server implementation and avoid unnecessary flatMap (#467)
1 parent 30d3cf2 commit ec42c83

File tree

18 files changed

+1838
-1480
lines changed

18 files changed

+1838
-1480
lines changed

build.gradle

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,12 @@ subprojects {
7373
}
7474

7575
repositories {
76-
maven { url 'http://repo.spring.io/milestone' }
77-
maven { url 'https://oss.jfrog.org/libs-snapshot' }
78-
maven { url 'https://dl.bintray.com/rsocket/RSocket' }
79-
maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' }
76+
jcenter()
8077
}
8178

8279
dependencies {
8380
compile "io.projectreactor:reactor-core:3.1.3.RELEASE"
8481
compile "io.netty:netty-buffer:4.1.20.Final"
85-
compile "org.reactivestreams:reactive-streams:1.0.1"
8682
compile "org.slf4j:slf4j-api:1.7.25"
8783
compile "com.google.code.findbugs:jsr305:3.0.2"
8884

rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ public Mono<Void> metadataPush(Payload payload) {
121121
}
122122

123123
@Override
124-
public void dispose() {
125-
}
124+
public void dispose() {}
126125

127126
@Override
128127
public Mono<Void> onClose() {

rsocket-core/src/jmh/java/io/rsocket/perfutil/TestDuplexConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ public double availability() {
6868
}
6969

7070
@Override
71-
public void dispose() {
72-
}
71+
public void dispose() {}
7372

7473
@Override
7574
public Mono<Void> onClose() {

rsocket-core/src/main/java/io/rsocket/Closeable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
public interface Closeable extends Disposable {
2424
/**
2525
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
26-
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying transport
27-
* connection is closed.
26+
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying
27+
* transport connection is closed.
2828
*
2929
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
3030
*/

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,17 @@
2222
import io.rsocket.internal.LimitableRequestPublisher;
2323
import io.rsocket.internal.UnboundedProcessor;
2424
import io.rsocket.util.NonBlockingHashMapLong;
25-
import org.reactivestreams.Publisher;
26-
import org.reactivestreams.Subscriber;
27-
import reactor.core.Disposable;
28-
import reactor.core.publisher.*;
29-
30-
import javax.annotation.Nullable;
3125
import java.time.Duration;
3226
import java.util.concurrent.atomic.AtomicBoolean;
3327
import java.util.concurrent.atomic.AtomicInteger;
3428
import java.util.function.Consumer;
3529
import java.util.function.Function;
3630
import java.util.function.Supplier;
31+
import javax.annotation.Nullable;
32+
import org.reactivestreams.Publisher;
33+
import org.reactivestreams.Subscriber;
34+
import reactor.core.Disposable;
35+
import reactor.core.publisher.*;
3736

3837
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
3938
class RSocketClient implements RSocket {
@@ -88,36 +87,25 @@ class RSocketClient implements RSocket {
8887
started
8988
.thenMany(Flux.interval(tickPeriod))
9089
.doOnSubscribe(s -> timeLastTickSentMs = System.currentTimeMillis())
91-
.concatMap(i -> sendKeepAlive(ackTimeoutMs, missedAcks))
92-
.doOnError(
90+
.subscribe(
91+
i -> sendKeepAlive(ackTimeoutMs, missedAcks),
9392
t -> {
9493
errorConsumer.accept(t);
9594
connection.dispose();
96-
})
97-
.subscribe();
95+
});
9896
}
9997

100-
connection
101-
.onClose()
102-
.doFinally(
103-
signalType -> {
104-
cleanup();
105-
})
106-
.doOnError(errorConsumer)
107-
.subscribe();
98+
connection.onClose().doFinally(signalType -> cleanup()).subscribe(null, errorConsumer);
10899

109100
connection
110101
.send(sendProcessor)
111-
.doOnError(this::handleSendProcessorError)
112102
.doFinally(this::handleSendProcessorCancel)
113-
.subscribe();
103+
.subscribe(null, this::handleSendProcessorError);
114104

115105
connection
116106
.receive()
117107
.doOnSubscribe(subscription -> started.onComplete())
118-
.doOnNext(this::handleIncomingFrames)
119-
.doOnError(errorConsumer)
120-
.subscribe();
108+
.subscribe(this::handleIncomingFrames, errorConsumer);
121109
}
122110

123111
private void handleSendProcessorError(Throwable t) {
@@ -152,23 +140,20 @@ private void handleSendProcessorCancel(SignalType t) {
152140
}
153141
}
154142

155-
private Mono<Void> sendKeepAlive(long ackTimeoutMs, int missedAcks) {
156-
return Mono.fromRunnable(
157-
() -> {
158-
long now = System.currentTimeMillis();
159-
if (now - timeLastTickSentMs > ackTimeoutMs) {
160-
int count = missedAckCounter.incrementAndGet();
161-
if (count >= missedAcks) {
162-
String message =
163-
String.format(
164-
"Missed %d keep-alive acks with a threshold of %d and a ack timeout of %d ms",
165-
count, missedAcks, ackTimeoutMs);
166-
throw new ConnectionException(message);
167-
}
168-
}
143+
private void sendKeepAlive(long ackTimeoutMs, int missedAcks) {
144+
long now = System.currentTimeMillis();
145+
if (now - timeLastTickSentMs > ackTimeoutMs) {
146+
int count = missedAckCounter.incrementAndGet();
147+
if (count >= missedAcks) {
148+
String message =
149+
String.format(
150+
"Missed %d keep-alive acks with a threshold of %d and a ack timeout of %d ms",
151+
count, missedAcks, ackTimeoutMs);
152+
throw new ConnectionException(message);
153+
}
154+
}
169155

170-
sendProcessor.onNext(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true));
171-
});
156+
sendProcessor.onNext(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true));
172157
}
173158

174159
@Override
@@ -380,14 +365,12 @@ public Frame apply(Payload payload) {
380365
}
381366
});
382367

383-
requestFrames
384-
.doOnNext(sendProcessor::onNext)
385-
.doOnError(
386-
t -> {
387-
errorConsumer.accept(t);
388-
receiver.dispose();
389-
})
390-
.subscribe();
368+
requestFrames.subscribe(
369+
sendProcessor::onNext,
370+
t -> {
371+
errorConsumer.accept(t);
372+
receiver.dispose();
373+
});
391374
} else {
392375
sendOneFrame(Frame.RequestN.from(streamId, l));
393376
}
@@ -415,10 +398,10 @@ private boolean contains(int streamId) {
415398

416399
protected void cleanup() {
417400
try {
418-
for (UnicastProcessor<Payload> subscriber: receivers.values()) {
401+
for (UnicastProcessor<Payload> subscriber : receivers.values()) {
419402
cleanUpSubscriber(subscriber);
420403
}
421-
for (LimitableRequestPublisher p: senders.values()) {
404+
for (LimitableRequestPublisher p : senders.values()) {
422405
cleanUpLimitableRequestPublisher(p);
423406
}
424407

0 commit comments

Comments
 (0)