54
54
import io .rsocket .util .MonoLifecycleHandler ;
55
55
import java .nio .channels .ClosedChannelException ;
56
56
import java .util .concurrent .CancellationException ;
57
- import java .util .concurrent .atomic .AtomicBoolean ;
57
+ import java .util .concurrent .atomic .AtomicInteger ;
58
58
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
59
59
import java .util .function .Consumer ;
60
60
import java .util .function .LongConsumer ;
@@ -260,6 +260,7 @@ public void doOnTerminal(
260
260
removeStreamReceiver (streamId );
261
261
}
262
262
});
263
+
263
264
receivers .put (streamId , receiver );
264
265
265
266
return receiver .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
@@ -281,7 +282,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
281
282
282
283
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
283
284
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
284
- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean ( false );
285
+ final AtomicInteger wip = new AtomicInteger ( 0 );
285
286
286
287
receivers .put (streamId , receiver );
287
288
@@ -293,30 +294,49 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
293
294
294
295
@ Override
295
296
public void accept (long n ) {
296
- if (firstRequest && ! receiver . isDisposed () ) {
297
+ if (firstRequest ) {
297
298
firstRequest = false ;
298
- if (! payloadReleasedFlag . getAndSet ( true ) ) {
299
- sendProcessor . onNext (
300
- RequestStreamFrameFlyweight . encodeReleasingPayload (
301
- allocator , streamId , n , payload )) ;
299
+ if (wip . getAndIncrement () != 0 ) {
300
+ // no need to do anything.
301
+ // stream was canceled and fist payload has already been discarded
302
+ return ;
302
303
}
303
- } else if (contains (streamId ) && !receiver .isDisposed ()) {
304
+ int missed = 1 ;
305
+ boolean firstHasBeenSent = false ;
306
+ for (; ; ) {
307
+ if (!firstHasBeenSent ) {
308
+ sendProcessor .onNext (
309
+ RequestStreamFrameFlyweight .encodeReleasingPayload (
310
+ allocator , streamId , n , payload ));
311
+ firstHasBeenSent = true ;
312
+ } else {
313
+ // if first frame was sent but we cycling again, it means that wip was
314
+ // incremented at doOnCancel
315
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
316
+ return ;
317
+ }
318
+
319
+ missed = wip .addAndGet (-missed );
320
+ if (missed == 0 ) {
321
+ return ;
322
+ }
323
+ }
324
+ } else if (!receiver .isDisposed ()) {
304
325
sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
305
326
}
306
327
}
307
328
})
308
- .doOnError (
309
- t -> {
310
- if (contains (streamId ) && !receiver .isDisposed ()) {
311
- sendProcessor .onNext (ErrorFrameFlyweight .encode (allocator , streamId , t ));
312
- }
313
- })
314
329
.doOnCancel (
315
330
() -> {
316
- if (! payloadReleasedFlag . getAndSet ( true ) ) {
317
- payload . release () ;
331
+ if (wip . getAndIncrement () != 0 ) {
332
+ return ;
318
333
}
319
- if (contains (streamId ) && !receiver .isDisposed ()) {
334
+
335
+ // check if we need to release payload
336
+ // only applicable if the cancel appears earlier than actual request
337
+ if (payload .refCnt () > 0 ) {
338
+ payload .release ();
339
+ } else {
320
340
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
321
341
}
322
342
})
@@ -330,30 +350,32 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
330
350
return Flux .error (err );
331
351
}
332
352
333
- return request .switchOnFirst (
334
- (s , flux ) -> {
335
- Payload payload = s .get ();
336
- if (payload != null ) {
337
- if (!PayloadValidationUtils .isValid (mtu , payload )) {
338
- payload .release ();
339
- final IllegalArgumentException t =
340
- new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
341
- errorConsumer .accept (t );
342
- return Mono .error (t );
343
- }
344
- return handleChannel (payload , flux );
345
- } else {
346
- return flux ;
347
- }
348
- },
349
- false );
353
+ return request
354
+ .switchOnFirst (
355
+ (s , flux ) -> {
356
+ Payload payload = s .get ();
357
+ if (payload != null ) {
358
+ if (!PayloadValidationUtils .isValid (mtu , payload )) {
359
+ payload .release ();
360
+ final IllegalArgumentException t =
361
+ new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
362
+ errorConsumer .accept (t );
363
+ return Mono .error (t );
364
+ }
365
+ return handleChannel (payload , flux );
366
+ } else {
367
+ return flux ;
368
+ }
369
+ },
370
+ false )
371
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
350
372
}
351
373
352
374
private Flux <? extends Payload > handleChannel (Payload initialPayload , Flux <Payload > inboundFlux ) {
353
375
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
354
- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean (false );
355
376
final int streamId = streamIdSupplier .nextStreamId (receivers );
356
377
378
+ final AtomicInteger wip = new AtomicInteger (0 );
357
379
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
358
380
final BaseSubscriber <Payload > upstreamSubscriber =
359
381
new BaseSubscriber <Payload >() {
@@ -421,19 +443,47 @@ protected void hookFinally(SignalType type) {
421
443
public void accept (long n ) {
422
444
if (firstRequest ) {
423
445
firstRequest = false ;
424
- senders .put (streamId , upstreamSubscriber );
425
- receivers .put (streamId , receiver );
426
-
427
- inboundFlux
428
- .limitRate (Queues .SMALL_BUFFER_SIZE )
429
- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
430
- .subscribe (upstreamSubscriber );
431
- if (!payloadReleasedFlag .getAndSet (true )) {
432
- ByteBuf frame =
433
- RequestChannelFrameFlyweight .encodeReleasingPayload (
434
- allocator , streamId , false , n , initialPayload );
435
-
436
- sendProcessor .onNext (frame );
446
+ if (wip .getAndIncrement () != 0 ) {
447
+ // no need to do anything.
448
+ // stream was canceled and fist payload has already been discarded
449
+ return ;
450
+ }
451
+ int missed = 1 ;
452
+ boolean firstHasBeenSent = false ;
453
+ for (; ; ) {
454
+ if (!firstHasBeenSent ) {
455
+ ByteBuf frame ;
456
+ try {
457
+ frame =
458
+ RequestChannelFrameFlyweight .encodeReleasingPayload (
459
+ allocator , streamId , false , n , initialPayload );
460
+ } catch (IllegalReferenceCountException e ) {
461
+ return ;
462
+ }
463
+
464
+ senders .put (streamId , upstreamSubscriber );
465
+ receivers .put (streamId , receiver );
466
+
467
+ inboundFlux
468
+ .limitRate (Queues .SMALL_BUFFER_SIZE )
469
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
470
+ .subscribe (upstreamSubscriber );
471
+
472
+ sendProcessor .onNext (frame );
473
+ firstHasBeenSent = true ;
474
+ } else {
475
+ // if first frame was sent but we cycling again, it means that wip was
476
+ // incremented at doOnCancel
477
+ senders .remove (streamId , upstreamSubscriber );
478
+ receivers .remove (streamId , receiver );
479
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
480
+ return ;
481
+ }
482
+
483
+ missed = wip .addAndGet (-missed );
484
+ if (missed == 0 ) {
485
+ return ;
486
+ }
437
487
}
438
488
} else {
439
489
sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
@@ -442,22 +492,22 @@ public void accept(long n) {
442
492
})
443
493
.doOnError (
444
494
t -> {
445
- if (receivers .remove (streamId , receiver )) {
446
- upstreamSubscriber .cancel ();
447
- }
495
+ upstreamSubscriber .cancel ();
496
+ receivers .remove (streamId , receiver );
448
497
})
449
498
.doOnComplete (() -> receivers .remove (streamId , receiver ))
450
499
.doOnCancel (
451
500
() -> {
452
- if (!payloadReleasedFlag .getAndSet (true )) {
453
- initialPayload .release ();
501
+ upstreamSubscriber .cancel ();
502
+ if (wip .getAndIncrement () != 0 ) {
503
+ return ;
454
504
}
505
+
506
+ // need to send frame only if RequestChannelFrame was sent
455
507
if (receivers .remove (streamId , receiver )) {
456
508
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
457
- upstreamSubscriber .cancel ();
458
509
}
459
- })
460
- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
510
+ });
461
511
}
462
512
463
513
private Mono <Void > handleMetadataPush (Payload payload ) {
0 commit comments