45
45
import io .rsocket .frame .decoder .PayloadDecoder ;
46
46
import io .rsocket .internal .SynchronizedIntObjectHashMap ;
47
47
import io .rsocket .internal .UnboundedProcessor ;
48
- import io .rsocket .internal .UnicastMonoEmpty ;
49
- import io .rsocket .internal .UnicastMonoProcessor ;
50
48
import io .rsocket .keepalive .KeepAliveFramesAcceptor ;
51
49
import io .rsocket .keepalive .KeepAliveHandler ;
52
50
import io .rsocket .keepalive .KeepAliveSupport ;
53
51
import io .rsocket .lease .RequesterLeaseHandler ;
54
- import io .rsocket .util .MonoLifecycleHandler ;
55
52
import java .nio .channels .ClosedChannelException ;
56
53
import java .util .concurrent .CancellationException ;
54
+ import java .util .concurrent .atomic .AtomicBoolean ;
57
55
import java .util .concurrent .atomic .AtomicInteger ;
58
56
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
59
57
import java .util .function .Consumer ;
60
58
import java .util .function .LongConsumer ;
61
59
import java .util .function .Supplier ;
62
- import javax .annotation .Nonnull ;
63
60
import javax .annotation .Nullable ;
64
61
import org .reactivestreams .Processor ;
65
62
import org .reactivestreams .Publisher ;
@@ -210,15 +207,25 @@ private Mono<Void> handleFireAndForget(Payload payload) {
210
207
return Mono .error (new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE ));
211
208
}
212
209
210
+ final AtomicBoolean once = new AtomicBoolean ();
213
211
final int streamId = streamIdSupplier .nextStreamId (receivers );
214
212
215
- return UnicastMonoEmpty . newInstance (
213
+ return Mono . defer (
216
214
() -> {
217
- ByteBuf requestFrame =
218
- RequestFireAndForgetFrameFlyweight .encodeReleasingPayload (
219
- allocator , streamId , payload );
215
+ if (once .getAndSet (true )) {
216
+ return Mono .error (
217
+ new IllegalStateException ("FireAndForgetMono allows only a single subscriber" ));
218
+ }
219
+
220
+ return Mono .<Void >empty ()
221
+ .doOnSubscribe (
222
+ (__ ) -> {
223
+ ByteBuf requestFrame =
224
+ RequestFireAndForgetFrameFlyweight .encodeReleasingPayload (
225
+ allocator , streamId , payload );
220
226
221
- sendProcessor .onNext (requestFrame );
227
+ sendProcessor .onNext (requestFrame );
228
+ });
222
229
});
223
230
}
224
231
@@ -236,34 +243,37 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
236
243
237
244
int streamId = streamIdSupplier .nextStreamId (receivers );
238
245
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
239
-
240
- UnicastMonoProcessor <Payload > receiver =
241
- UnicastMonoProcessor .create (
242
- new MonoLifecycleHandler <Payload >() {
243
- @ Override
244
- public void doOnSubscribe () {
245
- final ByteBuf requestFrame =
246
- RequestResponseFrameFlyweight .encodeReleasingPayload (
247
- allocator , streamId , payload );
248
-
249
- sendProcessor .onNext (requestFrame );
250
- }
251
-
252
- @ Override
253
- public void doOnTerminal (
254
- @ Nonnull SignalType signalType ,
255
- @ Nullable Payload element ,
256
- @ Nullable Throwable e ) {
257
- if (signalType == SignalType .CANCEL ) {
258
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
259
- }
260
- removeStreamReceiver (streamId );
261
- }
262
- });
246
+ final UnicastProcessor <Payload > receiver = UnicastProcessor .create (Queues .<Payload >one ().get ());
247
+ final AtomicBoolean once = new AtomicBoolean ();
263
248
264
249
receivers .put (streamId , receiver );
265
250
266
- return receiver .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
251
+ return Mono .defer (
252
+ () -> {
253
+ if (once .getAndSet (true )) {
254
+ return Mono .error (
255
+ new IllegalStateException ("RequestResponseMono allows only a single subscriber" ));
256
+ }
257
+
258
+ return receiver
259
+ .next ()
260
+ .doOnSubscribe (
261
+ (__ ) -> {
262
+ ByteBuf requestFrame =
263
+ RequestResponseFrameFlyweight .encodeReleasingPayload (
264
+ allocator , streamId , payload );
265
+
266
+ sendProcessor .onNext (requestFrame );
267
+ })
268
+ .doFinally (
269
+ signalType -> {
270
+ if (signalType == SignalType .CANCEL ) {
271
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
272
+ }
273
+ removeStreamReceiver (streamId );
274
+ })
275
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
276
+ });
267
277
}
268
278
269
279
private Flux <Payload > handleRequestStream (final Payload payload ) {
@@ -283,65 +293,76 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
283
293
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
284
294
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
285
295
final AtomicInteger wip = new AtomicInteger (0 );
296
+ final AtomicBoolean once = new AtomicBoolean ();
286
297
287
298
receivers .put (streamId , receiver );
288
299
289
- return receiver
290
- .doOnRequest (
291
- new LongConsumer () {
292
-
293
- boolean firstRequest = true ;
300
+ return Flux .defer (
301
+ () -> {
302
+ if (once .getAndSet (true )) {
303
+ return Flux .error (
304
+ new IllegalStateException ("RequestStreamFlux allows only a single subscriber" ));
305
+ }
294
306
295
- @ Override
296
- public void accept (long n ) {
297
- if (firstRequest ) {
298
- firstRequest = false ;
299
- if (wip .getAndIncrement () != 0 ) {
300
- // no need to do anything.
301
- // stream was canceled and fist payload has already been discarded
302
- return ;
303
- }
304
- int missed = 1 ;
305
- boolean firstHasBeenSent = false ;
306
- for (; ; ) {
307
- if (!firstHasBeenSent ) {
308
- sendProcessor .onNext (
309
- RequestStreamFrameFlyweight .encodeReleasingPayload (
310
- allocator , streamId , n , payload ));
311
- firstHasBeenSent = true ;
312
- } else {
313
- // if first frame was sent but we cycling again, it means that wip was
314
- // incremented at doOnCancel
315
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
316
- return ;
307
+ return receiver
308
+ .doOnRequest (
309
+ new LongConsumer () {
310
+
311
+ boolean firstRequest = true ;
312
+
313
+ @ Override
314
+ public void accept (long n ) {
315
+ if (firstRequest ) {
316
+ firstRequest = false ;
317
+ if (wip .getAndIncrement () != 0 ) {
318
+ // no need to do anything.
319
+ // stream was canceled and fist payload has already been discarded
320
+ return ;
321
+ }
322
+ int missed = 1 ;
323
+ boolean firstHasBeenSent = false ;
324
+ for (; ; ) {
325
+ if (!firstHasBeenSent ) {
326
+ sendProcessor .onNext (
327
+ RequestStreamFrameFlyweight .encodeReleasingPayload (
328
+ allocator , streamId , n , payload ));
329
+ firstHasBeenSent = true ;
330
+ } else {
331
+ // if first frame was sent but we cycling again, it means that wip was
332
+ // incremented at doOnCancel
333
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
334
+ return ;
335
+ }
336
+
337
+ missed = wip .addAndGet (-missed );
338
+ if (missed == 0 ) {
339
+ return ;
340
+ }
341
+ }
342
+ } else if (!receiver .isDisposed ()) {
343
+ sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
344
+ }
317
345
}
346
+ })
347
+ .doFinally (
348
+ s -> {
349
+ if (s == SignalType .CANCEL ) {
350
+ if (wip .getAndIncrement () != 0 ) {
351
+ return ;
352
+ }
318
353
319
- missed = wip .addAndGet (-missed );
320
- if (missed == 0 ) {
321
- return ;
354
+ // check if we need to release payload
355
+ // only applicable if the cancel appears earlier than actual request
356
+ if (payload .refCnt () > 0 ) {
357
+ payload .release ();
358
+ } else {
359
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
360
+ }
322
361
}
323
- }
324
- } else if (!receiver .isDisposed ()) {
325
- sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
326
- }
327
- }
328
- })
329
- .doOnCancel (
330
- () -> {
331
- if (wip .getAndIncrement () != 0 ) {
332
- return ;
333
- }
334
-
335
- // check if we need to release payload
336
- // only applicable if the cancel appears earlier than actual request
337
- if (payload .refCnt () > 0 ) {
338
- payload .release ();
339
- } else {
340
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
341
- }
342
- })
343
- .doFinally (s -> removeStreamReceiver (streamId ))
344
- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
362
+ removeStreamReceiver (streamId );
363
+ })
364
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
365
+ });
345
366
}
346
367
347
368
private Flux <Payload > handleChannel (Flux <Payload > request ) {
@@ -522,12 +543,23 @@ private Mono<Void> handleMetadataPush(Payload payload) {
522
543
return Mono .error (new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE ));
523
544
}
524
545
525
- return UnicastMonoEmpty .newInstance (
546
+ final AtomicBoolean once = new AtomicBoolean ();
547
+
548
+ return Mono .defer (
526
549
() -> {
527
- ByteBuf metadataPushFrame =
528
- MetadataPushFrameFlyweight .encodeReleasingPayload (allocator , payload );
550
+ if (once .getAndSet (true )) {
551
+ return Mono .error (
552
+ new IllegalStateException ("MetadataPushMono allows only a single subscriber" ));
553
+ }
554
+
555
+ return Mono .<Void >empty ()
556
+ .doOnSubscribe (
557
+ (__ ) -> {
558
+ ByteBuf metadataPushFrame =
559
+ MetadataPushFrameFlyweight .encodeReleasingPayload (allocator , payload );
529
560
530
- sendProcessor .onNextPrioritized (metadataPushFrame );
561
+ sendProcessor .onNextPrioritized (metadataPushFrame );
562
+ });
531
563
});
532
564
}
533
565
@@ -544,10 +576,6 @@ private Throwable checkAvailable() {
544
576
return null ;
545
577
}
546
578
547
- private boolean contains (int streamId ) {
548
- return receivers .containsKey (streamId );
549
- }
550
-
551
579
private void handleIncomingFrames (ByteBuf frame ) {
552
580
try {
553
581
int streamId = FrameHeaderFlyweight .streamId (frame );
0 commit comments