40
40
import io .rsocket .frame .RequestResponseFrameFlyweight ;
41
41
import io .rsocket .frame .RequestStreamFrameFlyweight ;
42
42
import io .rsocket .frame .decoder .PayloadDecoder ;
43
- import io .rsocket .internal .RateLimitableRequestPublisher ;
44
43
import io .rsocket .internal .SynchronizedIntObjectHashMap ;
45
44
import io .rsocket .internal .UnboundedProcessor ;
46
45
import io .rsocket .internal .UnicastMonoEmpty ;
51
50
import io .rsocket .lease .RequesterLeaseHandler ;
52
51
import io .rsocket .util .MonoLifecycleHandler ;
53
52
import java .nio .channels .ClosedChannelException ;
53
+ import java .util .concurrent .atomic .AtomicBoolean ;
54
54
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
55
55
import java .util .function .Consumer ;
56
56
import java .util .function .LongConsumer ;
60
60
import org .reactivestreams .Processor ;
61
61
import org .reactivestreams .Publisher ;
62
62
import org .reactivestreams .Subscriber ;
63
+ import org .reactivestreams .Subscription ;
63
64
import reactor .core .publisher .BaseSubscriber ;
64
65
import reactor .core .publisher .Flux ;
65
66
import reactor .core .publisher .Mono ;
@@ -84,7 +85,7 @@ class RSocketRequester implements RSocket {
84
85
private final PayloadDecoder payloadDecoder ;
85
86
private final Consumer <Throwable > errorConsumer ;
86
87
private final StreamIdSupplier streamIdSupplier ;
87
- private final IntObjectMap <RateLimitableRequestPublisher > senders ;
88
+ private final IntObjectMap <Subscription > senders ;
88
89
private final IntObjectMap <Processor <Payload , Payload >> receivers ;
89
90
private final UnboundedProcessor <ByteBuf > sendProcessor ;
90
91
private final RequesterLeaseHandler leaseHandler ;
@@ -258,6 +259,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
258
259
259
260
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
260
261
final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
262
+ final AtomicBoolean payloadReleasedFlag = new AtomicBoolean (false );
261
263
262
264
receivers .put (streamId , receiver );
263
265
@@ -279,7 +281,9 @@ public void accept(long n) {
279
281
n ,
280
282
payload .sliceMetadata ().retain (),
281
283
payload .sliceData ().retain ()));
282
- payload .release ();
284
+ if (!payloadReleasedFlag .getAndSet (true )) {
285
+ payload .release ();
286
+ }
283
287
} else if (contains (streamId ) && !receiver .isDisposed ()) {
284
288
sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
285
289
}
@@ -293,6 +297,9 @@ public void accept(long n) {
293
297
})
294
298
.doOnCancel (
295
299
() -> {
300
+ if (!payloadReleasedFlag .getAndSet (true )) {
301
+ payload .release ();
302
+ }
296
303
if (contains (streamId ) && !receiver .isDisposed ()) {
297
304
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
298
305
}
@@ -306,10 +313,67 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
306
313
return Flux .error (err );
307
314
}
308
315
316
+ return request .switchOnFirst (
317
+ (s , flux ) -> {
318
+ Payload payload = s .get ();
319
+ if (payload != null ) {
320
+ return handleChannel (payload , flux );
321
+ } else {
322
+ return flux ;
323
+ }
324
+ },
325
+ false );
326
+ }
327
+
328
+ private Flux <? extends Payload > handleChannel (Payload initialPayload , Flux <Payload > inboundFlux ) {
309
329
final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
310
- final UnicastProcessor < Payload > receiver = UnicastProcessor . create ( );
330
+ final AtomicBoolean payloadReleasedFlag = new AtomicBoolean ( false );
311
331
final int streamId = streamIdSupplier .nextStreamId (receivers );
312
332
333
+ final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
334
+ final BaseSubscriber <Payload > upstreamSubscriber =
335
+ new BaseSubscriber <Payload >() {
336
+
337
+ boolean first = true ;
338
+
339
+ @ Override
340
+ protected void hookOnSubscribe (Subscription subscription ) {
341
+ // noops
342
+ }
343
+
344
+ @ Override
345
+ protected void hookOnNext (Payload payload ) {
346
+ if (first ) {
347
+ // need to skip first since we have already sent it
348
+ first = false ;
349
+ return ;
350
+ }
351
+ final ByteBuf frame =
352
+ PayloadFrameFlyweight .encode (allocator , streamId , false , false , true , payload );
353
+
354
+ sendProcessor .onNext (frame );
355
+ payload .release ();
356
+ }
357
+
358
+ @ Override
359
+ protected void hookOnComplete () {
360
+ ByteBuf frame = PayloadFrameFlyweight .encodeComplete (allocator , streamId );
361
+ sendProcessor .onNext (frame );
362
+ }
363
+
364
+ @ Override
365
+ protected void hookOnError (Throwable t ) {
366
+ ByteBuf frame = ErrorFrameFlyweight .encode (allocator , streamId , t );
367
+ sendProcessor .onNext (frame );
368
+ receiver .onError (t );
369
+ }
370
+
371
+ @ Override
372
+ protected void hookFinally (SignalType type ) {
373
+ senders .remove (streamId , this );
374
+ }
375
+ };
376
+
313
377
return receiver
314
378
.doOnRequest (
315
379
new LongConsumer () {
@@ -320,85 +384,47 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
320
384
public void accept (long n ) {
321
385
if (firstRequest ) {
322
386
firstRequest = false ;
323
- request
324
- .transform (
325
- f -> {
326
- RateLimitableRequestPublisher <Payload > wrapped =
327
- RateLimitableRequestPublisher .wrap (f , Queues .SMALL_BUFFER_SIZE );
328
- // Need to set this to one for first the frame
329
- wrapped .request (1 );
330
- senders .put (streamId , wrapped );
331
- receivers .put (streamId , receiver );
332
-
333
- return wrapped ;
334
- })
335
- .subscribe (
336
- new BaseSubscriber <Payload >() {
337
-
338
- boolean firstPayload = true ;
339
-
340
- @ Override
341
- protected void hookOnNext (Payload payload ) {
342
- final ByteBuf frame ;
343
-
344
- if (firstPayload ) {
345
- firstPayload = false ;
346
- frame =
347
- RequestChannelFrameFlyweight .encode (
348
- allocator ,
349
- streamId ,
350
- false ,
351
- false ,
352
- n ,
353
- payload .sliceMetadata ().retain (),
354
- payload .sliceData ().retain ());
355
- } else {
356
- frame =
357
- PayloadFrameFlyweight .encode (
358
- allocator , streamId , false , false , true , payload );
359
- }
360
-
361
- sendProcessor .onNext (frame );
362
- payload .release ();
363
- }
364
-
365
- @ Override
366
- protected void hookOnComplete () {
367
- if (contains (streamId ) && !receiver .isDisposed ()) {
368
- sendProcessor .onNext (
369
- PayloadFrameFlyweight .encodeComplete (allocator , streamId ));
370
- }
371
- if (firstPayload ) {
372
- receiver .onComplete ();
373
- }
374
- }
375
-
376
- @ Override
377
- protected void hookOnError (Throwable t ) {
378
- errorConsumer .accept (t );
379
- receiver .dispose ();
380
- }
381
- });
382
- } else {
383
- if (contains (streamId ) && !receiver .isDisposed ()) {
384
- sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
387
+ senders .put (streamId , upstreamSubscriber );
388
+ receivers .put (streamId , receiver );
389
+
390
+ inboundFlux .limitRate (Queues .SMALL_BUFFER_SIZE ).subscribe (upstreamSubscriber );
391
+ if (!payloadReleasedFlag .getAndSet (true )) {
392
+ ByteBuf frame =
393
+ RequestChannelFrameFlyweight .encode (
394
+ allocator ,
395
+ streamId ,
396
+ false ,
397
+ false ,
398
+ n ,
399
+ initialPayload .sliceMetadata ().retain (),
400
+ initialPayload .sliceData ().retain ());
401
+
402
+ sendProcessor .onNext (frame );
403
+
404
+ initialPayload .release ();
385
405
}
406
+ } else {
407
+ sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
386
408
}
387
409
}
388
410
})
389
411
.doOnError (
390
412
t -> {
391
- if (contains (streamId ) && ! receiver . isDisposed ( )) {
392
- sendProcessor . onNext ( ErrorFrameFlyweight . encode ( allocator , streamId , t ) );
413
+ if (receivers . remove (streamId , receiver )) {
414
+ upstreamSubscriber . cancel ( );
393
415
}
394
416
})
417
+ .doOnComplete (() -> receivers .remove (streamId , receiver ))
395
418
.doOnCancel (
396
419
() -> {
397
- if (contains (streamId ) && !receiver .isDisposed ()) {
420
+ if (!payloadReleasedFlag .getAndSet (true )) {
421
+ initialPayload .release ();
422
+ }
423
+ if (receivers .remove (streamId , receiver )) {
398
424
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
425
+ upstreamSubscriber .cancel ();
399
426
}
400
- })
401
- .doFinally (s -> removeStreamReceiverAndSender (streamId ));
427
+ });
402
428
}
403
429
404
430
private Mono <Void > handleMetadataPush (Payload payload ) {
@@ -487,7 +513,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
487
513
break ;
488
514
case CANCEL :
489
515
{
490
- RateLimitableRequestPublisher sender = senders .remove (streamId );
516
+ Subscription sender = senders .remove (streamId );
491
517
if (sender != null ) {
492
518
sender .cancel ();
493
519
}
@@ -498,7 +524,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
498
524
break ;
499
525
case REQUEST_N :
500
526
{
501
- RateLimitableRequestPublisher sender = senders .get (streamId );
527
+ Subscription sender = senders .get (streamId );
502
528
if (sender != null ) {
503
529
int n = RequestNFrameFlyweight .requestN (frame );
504
530
sender .request (n >= Integer .MAX_VALUE ? Long .MAX_VALUE : n );
@@ -606,18 +632,6 @@ private void removeStreamReceiver(int streamId) {
606
632
}
607
633
}
608
634
609
- private void removeStreamReceiverAndSender (int streamId ) {
610
- /*on termination senders & receivers are explicitly cleared to avoid removing from map while iterating over one
611
- of its views*/
612
- if (terminationError == null ) {
613
- receivers .remove (streamId );
614
- RateLimitableRequestPublisher <?> sender = senders .remove (streamId );
615
- if (sender != null ) {
616
- sender .cancel ();
617
- }
618
- }
619
- }
620
-
621
635
private void handleSendProcessorError (Throwable t ) {
622
636
connection .dispose ();
623
637
}
0 commit comments