54
54
import io .rsocket .util .MonoLifecycleHandler ;
55
55
import java .nio .channels .ClosedChannelException ;
56
56
import java .util .concurrent .CancellationException ;
57
- import java .util .concurrent .atomic .AtomicBoolean ;
57
+ import java .util .concurrent .atomic .AtomicInteger ;
58
58
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
59
59
import java .util .function .Consumer ;
60
60
import java .util .function .LongConsumer ;
@@ -242,9 +242,14 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
242
242
new MonoLifecycleHandler <Payload >() {
243
243
@ Override
244
244
public void doOnSubscribe () {
245
- final ByteBuf requestFrame =
246
- RequestResponseFrameFlyweight .encodeReleasingPayload (
247
- allocator , streamId , payload );
245
+ final ByteBuf requestFrame ;
246
+ try {
247
+ requestFrame =
248
+ RequestResponseFrameFlyweight .encodeReleasingPayload (
249
+ allocator , streamId , payload );
250
+ } catch (IllegalReferenceCountException e ) {
251
+ return ;
252
+ }
248
253
249
254
sendProcessor .onNext (requestFrame );
250
255
}
@@ -260,6 +265,7 @@ public void doOnTerminal(
260
265
removeStreamReceiver (streamId );
261
266
}
262
267
});
268
+
263
269
receivers .put (streamId , receiver );
264
270
265
271
return receiver .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
@@ -281,7 +287,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
281
287
282
288
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
283
289
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
284
- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean ( false );
290
+ final AtomicInteger wip = new AtomicInteger ( 0 );
285
291
286
292
receivers .put (streamId , receiver );
287
293
@@ -295,30 +301,56 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
295
301
public void accept (long n ) {
296
302
if (firstRequest && !receiver .isDisposed ()) {
297
303
firstRequest = false ;
298
- if (!payloadReleasedFlag .getAndSet (true )) {
299
- sendProcessor .onNext (
300
- RequestStreamFrameFlyweight .encodeReleasingPayload (
301
- allocator , streamId , n , payload ));
304
+ if (wip .getAndIncrement () != 0 ) {
305
+ // no need to do anything.
306
+ // stream was canceled and fist payload has already been discarded
307
+ return ;
308
+ }
309
+ int missed = 1 ;
310
+ boolean firstHasBeenSent = false ;
311
+ for (; ; ) {
312
+ if (!firstHasBeenSent ) {
313
+ ByteBuf frame ;
314
+ try {
315
+ frame =
316
+ RequestStreamFrameFlyweight .encodeReleasingPayload (
317
+ allocator , streamId , n , payload );
318
+ } catch (IllegalReferenceCountException e ) {
319
+ return ;
320
+ }
321
+
322
+ sendProcessor .onNext (frame );
323
+ firstHasBeenSent = true ;
324
+ } else {
325
+ // if first frame was sent but we cycling again, it means that wip was
326
+ // incremented at doOnCancel
327
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
328
+ return ;
329
+ }
330
+
331
+ missed = wip .addAndGet (-missed );
332
+ if (missed == 0 ) {
333
+ return ;
334
+ }
302
335
}
303
- } else if ( contains ( streamId ) && ! receiver . isDisposed ()) {
336
+ } else {
304
337
sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
305
338
}
306
339
}
307
340
})
308
- .doOnError (
309
- t -> {
310
- if (contains (streamId ) && !receiver .isDisposed ()) {
311
- sendProcessor .onNext (ErrorFrameFlyweight .encode (allocator , streamId , t ));
312
- }
313
- })
314
341
.doOnCancel (
315
342
() -> {
316
- if (! payloadReleasedFlag . getAndSet ( true ) ) {
317
- payload . release () ;
343
+ if (wip . getAndIncrement () != 0 ) {
344
+ return ;
318
345
}
319
- if (contains (streamId ) && !receiver .isDisposed ()) {
320
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
346
+
347
+ // check if we need to release payload
348
+ // only applicable if the cancel appears earlier than actual request
349
+ if (payload .refCnt () > 0 ) {
350
+ payload .release ();
321
351
}
352
+
353
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
322
354
})
323
355
.doFinally (s -> removeStreamReceiver (streamId ))
324
356
.doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
@@ -330,30 +362,32 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
330
362
return Flux .error (err );
331
363
}
332
364
333
- return request .switchOnFirst (
334
- (s , flux ) -> {
335
- Payload payload = s .get ();
336
- if (payload != null ) {
337
- if (!PayloadValidationUtils .isValid (mtu , payload )) {
338
- payload .release ();
339
- final IllegalArgumentException t =
340
- new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
341
- errorConsumer .accept (t );
342
- return Mono .error (t );
343
- }
344
- return handleChannel (payload , flux );
345
- } else {
346
- return flux ;
347
- }
348
- },
349
- false );
365
+ return request
366
+ .switchOnFirst (
367
+ (s , flux ) -> {
368
+ Payload payload = s .get ();
369
+ if (payload != null ) {
370
+ if (!PayloadValidationUtils .isValid (mtu , payload )) {
371
+ payload .release ();
372
+ final IllegalArgumentException t =
373
+ new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
374
+ errorConsumer .accept (t );
375
+ return Mono .error (t );
376
+ }
377
+ return handleChannel (payload , flux );
378
+ } else {
379
+ return flux ;
380
+ }
381
+ },
382
+ false )
383
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
350
384
}
351
385
352
386
private Flux <? extends Payload > handleChannel (Payload initialPayload , Flux <Payload > inboundFlux ) {
353
387
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
354
- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean (false );
355
388
final int streamId = streamIdSupplier .nextStreamId (receivers );
356
389
390
+ final AtomicInteger wip = new AtomicInteger (0 );
357
391
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
358
392
final BaseSubscriber <Payload > upstreamSubscriber =
359
393
new BaseSubscriber <Payload >() {
@@ -421,43 +455,65 @@ protected void hookFinally(SignalType type) {
421
455
public void accept (long n ) {
422
456
if (firstRequest ) {
423
457
firstRequest = false ;
424
- senders .put (streamId , upstreamSubscriber );
425
- receivers .put (streamId , receiver );
426
-
427
- inboundFlux
428
- .limitRate (Queues .SMALL_BUFFER_SIZE )
429
- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
430
- .subscribe (upstreamSubscriber );
431
- if (!payloadReleasedFlag .getAndSet (true )) {
432
- ByteBuf frame =
433
- RequestChannelFrameFlyweight .encodeReleasingPayload (
434
- allocator , streamId , false , n , initialPayload );
435
-
436
- sendProcessor .onNext (frame );
458
+ if (wip .getAndIncrement () != 0 ) {
459
+ // no need to do anything.
460
+ // stream was canceled and fist payload has already been discarded
461
+ return ;
462
+ }
463
+ int missed = 1 ;
464
+ boolean firstHasBeenSent = false ;
465
+ for (; ; ) {
466
+ if (!firstHasBeenSent ) {
467
+ ByteBuf frame ;
468
+ try {
469
+ frame =
470
+ RequestChannelFrameFlyweight .encodeReleasingPayload (
471
+ allocator , streamId , false , n , initialPayload );
472
+ } catch (IllegalReferenceCountException e ) {
473
+ return ;
474
+ }
475
+
476
+ senders .put (streamId , upstreamSubscriber );
477
+ receivers .put (streamId , receiver );
478
+
479
+ inboundFlux
480
+ .limitRate (Queues .SMALL_BUFFER_SIZE )
481
+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
482
+ .subscribe (upstreamSubscriber );
483
+
484
+ sendProcessor .onNext (frame );
485
+ firstHasBeenSent = true ;
486
+ } else {
487
+ // if first frame was sent but we cycling again, it means that wip was
488
+ // incremented at doOnCancel
489
+ senders .remove (streamId , upstreamSubscriber );
490
+ receivers .remove (streamId , receiver );
491
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
492
+ return ;
493
+ }
494
+
495
+ missed = wip .addAndGet (-missed );
496
+ if (missed == 0 ) {
497
+ return ;
498
+ }
437
499
}
438
500
} else {
439
501
sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
440
502
}
441
503
}
442
504
})
443
- .doOnError (
444
- t -> {
445
- if (receivers .remove (streamId , receiver )) {
446
- upstreamSubscriber .cancel ();
447
- }
448
- })
449
- .doOnComplete (() -> receivers .remove (streamId , receiver ))
505
+ .doOnError (t -> upstreamSubscriber .cancel ())
450
506
.doOnCancel (
451
507
() -> {
452
- if (!payloadReleasedFlag .getAndSet (true )) {
453
- initialPayload .release ();
454
- }
455
- if (receivers .remove (streamId , receiver )) {
456
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
457
- upstreamSubscriber .cancel ();
508
+ upstreamSubscriber .cancel ();
509
+ if (wip .getAndIncrement () != 0 ) {
510
+ return ;
458
511
}
512
+
513
+ // need to send frame only if RequestChannelFrame was sent
514
+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
459
515
})
460
- .doOnDiscard ( ReferenceCounted . class , DROPPED_ELEMENTS_CONSUMER );
516
+ .doFinally ( __ -> receivers . remove ( streamId , receiver ) );
461
517
}
462
518
463
519
private Mono <Void > handleMetadataPush (Payload payload ) {
0 commit comments