Skip to content

Commit 4d69ee2

Browse files
committed
updates
1 parent 9c5ac91 commit 4d69ee2

File tree

3 files changed

+10
-17
lines changed

3 files changed

+10
-17
lines changed

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.rsocket.framing.FrameType;
2626

2727
/**
28-
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
28+
* Exposed to server for determination of ResponderRSocket based on mime types and SETUP metadata/data
2929
*/
3030
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {
3131

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@
4242
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
4343

4444
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
45-
class RSocketServer implements RequestHandler {
45+
class RSocketServer implements ResponderRSocket {
4646

4747
private final DuplexConnection connection;
4848
private final RSocket requestHandler;
49-
private final RequestHandler optimizedRequestHandler;
50-
private final boolean hasOptimizedRequestHandler;
49+
private final ResponderRSocket responderRSocket;
5150
private final Function<Frame, ? extends Payload> frameDecoder;
5251
private final Consumer<Throwable> errorConsumer;
5352

@@ -74,16 +73,10 @@ class RSocketServer implements RequestHandler {
7473
Consumer<Throwable> errorConsumer,
7574
long tickPeriod,
7675
long ackTimeout) {
77-
78-
if (requestHandler instanceof RequestHandler) {
79-
this.optimizedRequestHandler = (RequestHandler) requestHandler;
80-
this.hasOptimizedRequestHandler = true;
81-
this.requestHandler = null;
82-
} else {
83-
this.hasOptimizedRequestHandler = false;
84-
this.requestHandler = requestHandler;
85-
this.optimizedRequestHandler = null;
86-
}
76+
77+
this.requestHandler = requestHandler;
78+
this.responderRSocket =
79+
(requestHandler instanceof ResponderRSocket) ? (ResponderRSocket) requestHandler : null;
8780

8881
this.connection = connection;
8982
this.frameDecoder = frameDecoder;
@@ -221,7 +214,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
221214
@Override
222215
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
223216
try {
224-
return optimizedRequestHandler.requestChannel(payloads);
217+
return responderRSocket.requestChannel(payloads);
225218
} catch (Throwable t) {
226219
return Flux.error(t);
227220
}
@@ -415,7 +408,7 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
415408
// and any later payload can be processed
416409
frames.onNext(payload);
417410

418-
if (hasOptimizedRequestHandler) {
411+
if (responderRSocket != null) {
419412
handleStream(streamId, requestChannel(payload, payloads), initialRequestN);
420413
} else {
421414
handleStream(streamId, requestChannel(payloads), initialRequestN);

rsocket-core/src/main/java/io/rsocket/RequestHandler.java renamed to rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
88
* channel.
99
*/
10-
public interface RequestHandler extends RSocket {
10+
public interface ResponderRSocket extends RSocket {
1111
/**
1212
* Implement this method to peak at the first payload of the incoming request stream without
1313
* having to subscribe to Publish&lt;Payload&gt; payloads

0 commit comments

Comments
 (0)