27
27
import org .reactivestreams .Subscriber ;
28
28
import org .reactivestreams .Subscription ;
29
29
import reactor .core .Disposable ;
30
- import reactor .core .publisher .*;
30
+ import reactor .core .publisher .Flux ;
31
+ import reactor .core .publisher .Mono ;
32
+ import reactor .core .publisher .SignalType ;
33
+ import reactor .core .publisher .UnicastProcessor ;
31
34
32
35
import java .util .Collections ;
33
36
import java .util .Map ;
39
42
import static io .rsocket .frame .FrameHeaderFlyweight .FLAGS_M ;
40
43
41
44
/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
42
- class RSocketServer implements RSocket {
45
+ class RSocketServer implements ResponderRSocket {
43
46
44
47
private final DuplexConnection connection ;
45
48
private final RSocket requestHandler ;
49
+ private final ResponderRSocket responderRSocket ;
46
50
private final Function <Frame , ? extends Payload > frameDecoder ;
47
51
private final Consumer <Throwable > errorConsumer ;
48
52
49
53
private final Map <Integer , Subscription > sendingSubscriptions ;
50
- private final Map <Integer , Processor <Payload ,Payload >> channelProcessors ;
54
+ private final Map <Integer , Processor <Payload , Payload >> channelProcessors ;
51
55
52
56
private final UnboundedProcessor <Frame > sendProcessor ;
53
57
private KeepAliveHandler keepAliveHandler ;
@@ -69,12 +73,16 @@ class RSocketServer implements RSocket {
69
73
Consumer <Throwable > errorConsumer ,
70
74
long tickPeriod ,
71
75
long ackTimeout ) {
72
- this . connection = connection ;
76
+
73
77
this .requestHandler = requestHandler ;
78
+ this .responderRSocket =
79
+ (requestHandler instanceof ResponderRSocket ) ? (ResponderRSocket ) requestHandler : null ;
80
+
81
+ this .connection = connection ;
74
82
this .frameDecoder = frameDecoder ;
75
83
this .errorConsumer = errorConsumer ;
76
84
this .sendingSubscriptions = Collections .synchronizedMap (new IntObjectHashMap <>());
77
- this .channelProcessors = Collections .synchronizedMap (new IntObjectHashMap <>());
85
+ this .channelProcessors = Collections .synchronizedMap (new IntObjectHashMap <>());
78
86
79
87
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
80
88
// connections
@@ -116,43 +124,55 @@ class RSocketServer implements RSocket {
116
124
}
117
125
118
126
private void handleSendProcessorError (Throwable t ) {
119
- sendingSubscriptions .values ().forEach (subscription -> {
120
- try {
121
- subscription .cancel ();
122
- } catch (Throwable e ) {
123
- errorConsumer .accept (e );
124
- }
125
- });
127
+ sendingSubscriptions
128
+ .values ()
129
+ .forEach (
130
+ subscription -> {
131
+ try {
132
+ subscription .cancel ();
133
+ } catch (Throwable e ) {
134
+ errorConsumer .accept (e );
135
+ }
136
+ });
126
137
127
- channelProcessors .values ().forEach (subscription -> {
128
- try {
129
- subscription .onError (t );
130
- } catch (Throwable e ) {
131
- errorConsumer .accept (e );
132
- }
133
- });
138
+ channelProcessors
139
+ .values ()
140
+ .forEach (
141
+ subscription -> {
142
+ try {
143
+ subscription .onError (t );
144
+ } catch (Throwable e ) {
145
+ errorConsumer .accept (e );
146
+ }
147
+ });
134
148
}
135
149
136
150
private void handleSendProcessorCancel (SignalType t ) {
137
151
if (SignalType .ON_ERROR == t ) {
138
152
return ;
139
153
}
140
154
141
- sendingSubscriptions .values ().forEach (subscription -> {
142
- try {
143
- subscription .cancel ();
144
- } catch (Throwable e ) {
145
- errorConsumer .accept (e );
146
- }
147
- });
155
+ sendingSubscriptions
156
+ .values ()
157
+ .forEach (
158
+ subscription -> {
159
+ try {
160
+ subscription .cancel ();
161
+ } catch (Throwable e ) {
162
+ errorConsumer .accept (e );
163
+ }
164
+ });
148
165
149
- channelProcessors .values ().forEach (subscription -> {
150
- try {
151
- subscription .onComplete ();
152
- } catch (Throwable e ) {
153
- errorConsumer .accept (e );
154
- }
155
- });
166
+ channelProcessors
167
+ .values ()
168
+ .forEach (
169
+ subscription -> {
170
+ try {
171
+ subscription .onComplete ();
172
+ } catch (Throwable e ) {
173
+ errorConsumer .accept (e );
174
+ }
175
+ });
156
176
}
157
177
158
178
@ Override
@@ -191,6 +211,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
191
211
}
192
212
}
193
213
214
+ @ Override
215
+ public Flux <Payload > requestChannel (Payload payload , Publisher <Payload > payloads ) {
216
+ try {
217
+ return responderRSocket .requestChannel (payload , payloads );
218
+ } catch (Throwable t ) {
219
+ return Flux .error (t );
220
+ }
221
+ }
222
+
194
223
@ Override
195
224
public Mono <Void > metadataPush (Payload payload ) {
196
225
try {
@@ -232,9 +261,7 @@ private synchronized void cleanUpSendingSubscriptions() {
232
261
}
233
262
234
263
private synchronized void cleanUpChannelProcessors () {
235
- channelProcessors
236
- .values ()
237
- .forEach (Processor ::onComplete );
264
+ channelProcessors .values ().forEach (Processor ::onComplete );
238
265
channelProcessors .clear ();
239
266
}
240
267
@@ -381,7 +408,11 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
381
408
// and any later payload can be processed
382
409
frames .onNext (payload );
383
410
384
- handleStream (streamId , requestChannel (payloads ), initialRequestN );
411
+ if (responderRSocket != null ) {
412
+ handleStream (streamId , requestChannel (payload , payloads ), initialRequestN );
413
+ } else {
414
+ handleStream (streamId , requestChannel (payloads ), initialRequestN );
415
+ }
385
416
}
386
417
387
418
private void handleKeepAliveFrame (Frame frame ) {
0 commit comments