25
25
import io .netty .util .collection .IntObjectMap ;
26
26
import io .rsocket .exceptions .ConnectionErrorException ;
27
27
import io .rsocket .exceptions .Exceptions ;
28
- import io .rsocket .frame .*;
28
+ import io .rsocket .frame .CancelFrameFlyweight ;
29
+ import io .rsocket .frame .ErrorFrameFlyweight ;
30
+ import io .rsocket .frame .FrameHeaderFlyweight ;
31
+ import io .rsocket .frame .FrameType ;
32
+ import io .rsocket .frame .MetadataPushFrameFlyweight ;
33
+ import io .rsocket .frame .PayloadFrameFlyweight ;
34
+ import io .rsocket .frame .RequestChannelFrameFlyweight ;
35
+ import io .rsocket .frame .RequestFireAndForgetFrameFlyweight ;
36
+ import io .rsocket .frame .RequestNFrameFlyweight ;
37
+ import io .rsocket .frame .RequestResponseFrameFlyweight ;
38
+ import io .rsocket .frame .RequestStreamFrameFlyweight ;
29
39
import io .rsocket .frame .decoder .PayloadDecoder ;
30
40
import io .rsocket .internal .RateLimitableRequestPublisher ;
31
41
import io .rsocket .internal .SynchronizedIntObjectHashMap ;
32
42
import io .rsocket .internal .UnboundedProcessor ;
43
+ import io .rsocket .internal .UnicastMonoEmpty ;
33
44
import io .rsocket .internal .UnicastMonoProcessor ;
34
45
import io .rsocket .keepalive .KeepAliveFramesAcceptor ;
35
46
import io .rsocket .keepalive .KeepAliveHandler ;
36
47
import io .rsocket .keepalive .KeepAliveSupport ;
37
48
import io .rsocket .lease .RequesterLeaseHandler ;
38
- import io .rsocket .util .OnceConsumer ;
49
+ import io .rsocket .util .MonoLifecycleHandler ;
39
50
import java .nio .channels .ClosedChannelException ;
40
51
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
41
52
import java .util .function .Consumer ;
46
57
import org .reactivestreams .Processor ;
47
58
import org .reactivestreams .Publisher ;
48
59
import org .reactivestreams .Subscriber ;
49
- import org .reactivestreams .Subscription ;
50
- import reactor .core .publisher .*;
60
+ import reactor .core .publisher .BaseSubscriber ;
61
+ import reactor .core .publisher .Flux ;
62
+ import reactor .core .publisher .Mono ;
63
+ import reactor .core .publisher .SignalType ;
64
+ import reactor .core .publisher .UnicastProcessor ;
51
65
import reactor .util .concurrent .Queues ;
52
66
53
67
/**
@@ -170,23 +184,19 @@ private Mono<Void> handleFireAndForget(Payload payload) {
170
184
171
185
final int streamId = streamIdSupplier .nextStreamId (receivers );
172
186
173
- return emptyUnicastMono ()
174
- .doOnSubscribe (
175
- new OnceConsumer <Subscription >() {
176
- @ Override
177
- public void acceptOnce (@ Nonnull Subscription subscription ) {
178
- ByteBuf requestFrame =
179
- RequestFireAndForgetFrameFlyweight .encode (
180
- allocator ,
181
- streamId ,
182
- false ,
183
- payload .hasMetadata () ? payload .sliceMetadata ().retain () : null ,
184
- payload .sliceData ().retain ());
185
- payload .release ();
186
-
187
- sendProcessor .onNext (requestFrame );
188
- }
189
- });
187
+ return UnicastMonoEmpty .newInstance (
188
+ () -> {
189
+ ByteBuf requestFrame =
190
+ RequestFireAndForgetFrameFlyweight .encode (
191
+ allocator ,
192
+ streamId ,
193
+ false ,
194
+ payload .hasMetadata () ? payload .sliceMetadata ().retain () : null ,
195
+ payload .sliceData ().retain ());
196
+ payload .release ();
197
+
198
+ sendProcessor .onNext (requestFrame );
199
+ });
190
200
}
191
201
192
202
private Mono <Payload > handleRequestResponse (final Payload payload ) {
@@ -199,14 +209,11 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
199
209
int streamId = streamIdSupplier .nextStreamId (receivers );
200
210
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
201
211
202
- UnicastMonoProcessor <Payload > receiver = UnicastMonoProcessor .create ();
203
- receivers .put (streamId , receiver );
204
-
205
- return receiver
206
- .doOnSubscribe (
207
- new OnceConsumer <Subscription >() {
212
+ UnicastMonoProcessor <Payload > receiver =
213
+ UnicastMonoProcessor .create (
214
+ new MonoLifecycleHandler <Payload >() {
208
215
@ Override
209
- public void acceptOnce ( @ Nonnull Subscription subscription ) {
216
+ public void doOnSubscribe ( ) {
210
217
final ByteBuf requestFrame =
211
218
RequestResponseFrameFlyweight .encode (
212
219
allocator ,
@@ -218,15 +225,23 @@ public void acceptOnce(@Nonnull Subscription subscription) {
218
225
219
226
sendProcessor .onNext (requestFrame );
220
227
}
221
- })
222
- .doOnError (t -> sendProcessor .onNext (ErrorFrameFlyweight .encode (allocator , streamId , t )))
223
- .doFinally (
224
- s -> {
225
- if (s == SignalType .CANCEL ) {
226
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
228
+
229
+ @ Override
230
+ public void doOnTerminal (
231
+ @ Nonnull SignalType signalType ,
232
+ @ Nullable Payload element ,
233
+ @ Nullable Throwable e ) {
234
+ if (signalType == SignalType .ON_ERROR ) {
235
+ sendProcessor .onNext (ErrorFrameFlyweight .encode (allocator , streamId , e ));
236
+ } else if (signalType == SignalType .CANCEL ) {
237
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
238
+ }
239
+ removeStreamReceiver (streamId );
227
240
}
228
- removeStreamReceiver (streamId );
229
241
});
242
+ receivers .put (streamId , receiver );
243
+
244
+ return receiver ;
230
245
}
231
246
232
247
private Flux <Payload > handleRequestStream (final Payload payload ) {
@@ -390,24 +405,14 @@ private Mono<Void> handleMetadataPush(Payload payload) {
390
405
return Mono .error (err );
391
406
}
392
407
393
- return emptyUnicastMono ()
394
- .doOnSubscribe (
395
- new OnceConsumer <Subscription >() {
396
- @ Override
397
- public void acceptOnce (@ Nonnull Subscription subscription ) {
398
- ByteBuf metadataPushFrame =
399
- MetadataPushFrameFlyweight .encode (allocator , payload .sliceMetadata ().retain ());
400
- payload .release ();
401
-
402
- sendProcessor .onNext (metadataPushFrame );
403
- }
404
- });
405
- }
408
+ return UnicastMonoEmpty .newInstance (
409
+ () -> {
410
+ ByteBuf metadataPushFrame =
411
+ MetadataPushFrameFlyweight .encode (allocator , payload .sliceMetadata ().retain ());
412
+ payload .release ();
406
413
407
- private static UnicastMonoProcessor <Void > emptyUnicastMono () {
408
- UnicastMonoProcessor <Void > result = UnicastMonoProcessor .create ();
409
- result .onComplete ();
410
- return result ;
414
+ sendProcessor .onNext (metadataPushFrame );
415
+ });
411
416
}
412
417
413
418
private Throwable checkAvailable () {
0 commit comments