42
42
import static io .rsocket .frame .FrameHeaderFlyweight .FLAGS_M ;
43
43
44
44
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
45
- class RSocketServer implements RequestHandler {
45
+ class RSocketServer implements ResponderRSocket {
46
46
47
47
private final DuplexConnection connection ;
48
48
private final RSocket requestHandler ;
49
- private final RequestHandler optimizedRequestHandler ;
50
- private final boolean hasOptimizedRequestHandler ;
49
+ private final ResponderRSocket responderRSocket ;
51
50
private final Function <Frame , ? extends Payload > frameDecoder ;
52
51
private final Consumer <Throwable > errorConsumer ;
53
52
@@ -74,16 +73,10 @@ class RSocketServer implements RequestHandler {
74
73
Consumer <Throwable > errorConsumer ,
75
74
long tickPeriod ,
76
75
long ackTimeout ) {
77
-
78
- if (requestHandler instanceof RequestHandler ) {
79
- this .optimizedRequestHandler = (RequestHandler ) requestHandler ;
80
- this .hasOptimizedRequestHandler = true ;
81
- this .requestHandler = null ;
82
- } else {
83
- this .hasOptimizedRequestHandler = false ;
84
- this .requestHandler = requestHandler ;
85
- this .optimizedRequestHandler = null ;
86
- }
76
+
77
+ this .requestHandler = requestHandler ;
78
+ this .responderRSocket =
79
+ (requestHandler instanceof ResponderRSocket ) ? (ResponderRSocket ) requestHandler : null ;
87
80
88
81
this .connection = connection ;
89
82
this .frameDecoder = frameDecoder ;
@@ -221,7 +214,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
221
214
@ Override
222
215
public Flux <Payload > requestChannel (Payload payload , Publisher <Payload > payloads ) {
223
216
try {
224
- return optimizedRequestHandler .requestChannel (payloads );
217
+ return responderRSocket .requestChannel (payloads );
225
218
} catch (Throwable t ) {
226
219
return Flux .error (t );
227
220
}
@@ -415,7 +408,7 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
415
408
// and any later payload can be processed
416
409
frames .onNext (payload );
417
410
418
- if (hasOptimizedRequestHandler ) {
411
+ if (responderRSocket != null ) {
419
412
handleStream (streamId , requestChannel (payload , payloads ), initialRequestN );
420
413
} else {
421
414
handleStream (streamId , requestChannel (payloads ), initialRequestN );
0 commit comments