45
45
import io .rsocket .frame .decoder .PayloadDecoder ;
46
46
import io .rsocket .internal .SynchronizedIntObjectHashMap ;
47
47
import io .rsocket .internal .UnboundedProcessor ;
48
- import io .rsocket .internal .UnicastMonoEmpty ;
49
- import io .rsocket .internal .UnicastMonoProcessor ;
50
48
import io .rsocket .keepalive .KeepAliveFramesAcceptor ;
51
49
import io .rsocket .keepalive .KeepAliveHandler ;
52
50
import io .rsocket .keepalive .KeepAliveSupport ;
53
51
import io .rsocket .lease .RequesterLeaseHandler ;
54
- import io .rsocket .util .MonoLifecycleHandler ;
55
52
import java .nio .channels .ClosedChannelException ;
53
+ import java .util .Queue ;
56
54
import java .util .concurrent .CancellationException ;
57
55
import java .util .concurrent .atomic .AtomicInteger ;
58
56
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
59
57
import java .util .function .Consumer ;
60
58
import java .util .function .LongConsumer ;
61
59
import java .util .function .Supplier ;
62
- import javax .annotation .Nonnull ;
63
60
import javax .annotation .Nullable ;
64
61
import org .reactivestreams .Processor ;
65
62
import org .reactivestreams .Publisher ;
@@ -92,6 +89,8 @@ class RSocketRequester implements RSocket {
92
89
}
93
90
};
94
91
92
+ private static final Queue <Void > EMPTY_QUEUE = Queues .<Void >empty ().get ();
93
+
95
94
static {
96
95
CLOSED_CHANNEL_EXCEPTION .setStackTrace (new StackTraceElement [0 ]);
97
96
}
@@ -212,14 +211,16 @@ private Mono<Void> handleFireAndForget(Payload payload) {
212
211
213
212
final int streamId = streamIdSupplier .nextStreamId (receivers );
214
213
215
- return UnicastMonoEmpty .newInstance (
216
- () -> {
217
- ByteBuf requestFrame =
218
- RequestFireAndForgetFrameFlyweight .encodeReleasingPayload (
219
- allocator , streamId , payload );
214
+ return UnicastProcessor .create (EMPTY_QUEUE )
215
+ .doOnSubscribe (
216
+ (__ ) -> {
217
+ ByteBuf requestFrame =
218
+ RequestFireAndForgetFrameFlyweight .encodeReleasingPayload (
219
+ allocator , streamId , payload );
220
220
221
- sendProcessor .onNext (requestFrame );
222
- });
221
+ sendProcessor .onNext (requestFrame );
222
+ })
223
+ .then ();
223
224
}
224
225
225
226
private Mono <Payload > handleRequestResponse (final Payload payload ) {
@@ -236,34 +237,28 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
236
237
237
238
int streamId = streamIdSupplier .nextStreamId (receivers );
238
239
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
239
-
240
- UnicastMonoProcessor <Payload > receiver =
241
- UnicastMonoProcessor .create (
242
- new MonoLifecycleHandler <Payload >() {
243
- @ Override
244
- public void doOnSubscribe () {
245
- final ByteBuf requestFrame =
246
- RequestResponseFrameFlyweight .encodeReleasingPayload (
247
- allocator , streamId , payload );
248
-
249
- sendProcessor .onNext (requestFrame );
250
- }
251
-
252
- @ Override
253
- public void doOnTerminal (
254
- @ Nonnull SignalType signalType ,
255
- @ Nullable Payload element ,
256
- @ Nullable Throwable e ) {
257
- if (signalType == SignalType .CANCEL ) {
258
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
259
- }
260
- removeStreamReceiver (streamId );
261
- }
262
- });
240
+ final UnicastProcessor <Payload > receiver = UnicastProcessor .create (Queues .<Payload >one ().get ());
263
241
264
242
receivers .put (streamId , receiver );
265
243
266
- return receiver .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
244
+ return receiver
245
+ .as (Mono ::fromDirect )
246
+ .doOnSubscribe (
247
+ (__ ) -> {
248
+ ByteBuf requestFrame =
249
+ RequestResponseFrameFlyweight .encodeReleasingPayload (
250
+ allocator , streamId , payload );
251
+
252
+ sendProcessor .onNext (requestFrame );
253
+ })
254
+ .doFinally (
255
+ signalType -> {
256
+ if (signalType == SignalType .CANCEL ) {
257
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
258
+ }
259
+ removeStreamReceiver (streamId );
260
+ })
261
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
267
262
}
268
263
269
264
private Flux <Payload > handleRequestStream (final Payload payload ) {
@@ -522,13 +517,15 @@ private Mono<Void> handleMetadataPush(Payload payload) {
522
517
return Mono .error (new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE ));
523
518
}
524
519
525
- return UnicastMonoEmpty .newInstance (
526
- () -> {
527
- ByteBuf metadataPushFrame =
528
- MetadataPushFrameFlyweight .encodeReleasingPayload (allocator , payload );
520
+ return UnicastProcessor .create (EMPTY_QUEUE )
521
+ .doOnSubscribe (
522
+ (__ ) -> {
523
+ ByteBuf metadataPushFrame =
524
+ MetadataPushFrameFlyweight .encodeReleasingPayload (allocator , payload );
529
525
530
- sendProcessor .onNextPrioritized (metadataPushFrame );
531
- });
526
+ sendProcessor .onNextPrioritized (metadataPushFrame );
527
+ })
528
+ .then ();
532
529
}
533
530
534
531
@ Nullable
0 commit comments