|
44 | 44 | import java.util.function.BiConsumer;
|
45 | 45 | import java.util.function.Consumer;
|
46 | 46 | import java.util.function.Supplier;
|
47 |
| - |
48 | 47 | import reactor.core.Disposable;
|
49 | 48 | import reactor.core.publisher.Mono;
|
50 | 49 | import reactor.core.scheduler.Schedulers;
|
@@ -551,112 +550,112 @@ private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
|
551 | 550 | ? new FragmentationDuplexConnection(connection, mtu, "client")
|
552 | 551 | : new ReassemblyDuplexConnection(connection));
|
553 | 552 | return connectionMono
|
554 |
| - .flatMap( |
555 |
| - connection -> |
556 |
| - setupPayloadMono |
557 |
| - .defaultIfEmpty(EmptyPayload.INSTANCE) |
558 |
| - .map(setupPayload -> Tuples.of(connection, setupPayload)) |
559 |
| - .doOnError(ex -> connection.dispose()) |
560 |
| - .doOnCancel(connection::dispose)) |
561 |
| - .flatMap( |
562 |
| - tuple -> { |
563 |
| - DuplexConnection connection = tuple.getT1(); |
564 |
| - Payload setupPayload = tuple.getT2(); |
565 |
| - |
566 |
| - ByteBuf resumeToken; |
567 |
| - KeepAliveHandler keepAliveHandler; |
568 |
| - DuplexConnection wrappedConnection; |
569 |
| - |
570 |
| - if (resume != null) { |
571 |
| - resumeToken = resume.getTokenSupplier().get(); |
572 |
| - ClientRSocketSession session = |
573 |
| - new ClientRSocketSession( |
574 |
| - connection, |
575 |
| - resume.getSessionDuration(), |
576 |
| - resume.getRetry(), |
577 |
| - resume.getStoreFactory(CLIENT_TAG).apply(resumeToken), |
578 |
| - resume.getStreamTimeout(), |
579 |
| - resume.isCleanupStoreOnKeepAlive()) |
580 |
| - .continueWith(connectionMono) |
581 |
| - .resumeToken(resumeToken); |
582 |
| - keepAliveHandler = |
583 |
| - new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection()); |
584 |
| - wrappedConnection = session.resumableConnection(); |
585 |
| - } else { |
586 |
| - resumeToken = Unpooled.EMPTY_BUFFER; |
587 |
| - keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection); |
588 |
| - wrappedConnection = connection; |
589 |
| - } |
590 |
| - |
591 |
| - ClientServerInputMultiplexer multiplexer = |
592 |
| - new ClientServerInputMultiplexer(wrappedConnection, interceptors, true); |
593 |
| - |
594 |
| - boolean leaseEnabled = leasesSupplier != null; |
595 |
| - Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null; |
596 |
| - RequesterLeaseHandler requesterLeaseHandler = |
597 |
| - leaseEnabled |
598 |
| - ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) |
599 |
| - : RequesterLeaseHandler.None; |
600 |
| - |
601 |
| - RSocket rSocketRequester = |
602 |
| - new RSocketRequester( |
603 |
| - multiplexer.asClientConnection(), |
604 |
| - payloadDecoder, |
605 |
| - StreamIdSupplier.clientSupplier(), |
606 |
| - mtu, |
607 |
| - (int) keepAliveInterval.toMillis(), |
608 |
| - (int) keepAliveMaxLifeTime.toMillis(), |
609 |
| - keepAliveHandler, |
610 |
| - requesterLeaseHandler, |
611 |
| - Schedulers.single(Schedulers.parallel())); |
612 |
| - |
613 |
| - RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); |
614 |
| - |
615 |
| - ByteBuf setupFrame = |
616 |
| - SetupFrameCodec.encode( |
617 |
| - wrappedConnection.alloc(), |
618 |
| - leaseEnabled, |
619 |
| - (int) keepAliveInterval.toMillis(), |
620 |
| - (int) keepAliveMaxLifeTime.toMillis(), |
621 |
| - resumeToken, |
622 |
| - metadataMimeType, |
623 |
| - dataMimeType, |
624 |
| - setupPayload); |
625 |
| - |
626 |
| - SocketAcceptor acceptor = |
627 |
| - this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {}); |
628 |
| - |
629 |
| - ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame); |
630 |
| - |
631 |
| - return interceptors |
632 |
| - .initSocketAcceptor(acceptor) |
633 |
| - .accept(setup, wrappedRSocketRequester) |
634 |
| - .flatMap( |
635 |
| - rSocketHandler -> { |
636 |
| - RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); |
637 |
| - |
638 |
| - ResponderLeaseHandler responderLeaseHandler = |
639 |
| - leaseEnabled |
640 |
| - ? new ResponderLeaseHandler.Impl<>( |
641 |
| - CLIENT_TAG, |
642 |
| - wrappedConnection.alloc(), |
643 |
| - leases.sender(), |
644 |
| - leases.stats()) |
645 |
| - : ResponderLeaseHandler.None; |
646 |
| - |
647 |
| - RSocket rSocketResponder = |
648 |
| - new RSocketResponder( |
649 |
| - multiplexer.asServerConnection(), |
650 |
| - wrappedRSocketHandler, |
651 |
| - payloadDecoder, |
652 |
| - responderLeaseHandler, |
653 |
| - mtu); |
654 |
| - |
655 |
| - return wrappedConnection |
656 |
| - .sendOne(setupFrame.retain()) |
657 |
| - .thenReturn(wrappedRSocketRequester); |
658 |
| - }) |
659 |
| - .doFinally(signalType -> setup.release()); |
660 |
| - }); |
| 553 | + .flatMap( |
| 554 | + connection -> |
| 555 | + setupPayloadMono |
| 556 | + .defaultIfEmpty(EmptyPayload.INSTANCE) |
| 557 | + .map(setupPayload -> Tuples.of(connection, setupPayload)) |
| 558 | + .doOnError(ex -> connection.dispose()) |
| 559 | + .doOnCancel(connection::dispose)) |
| 560 | + .flatMap( |
| 561 | + tuple -> { |
| 562 | + DuplexConnection connection = tuple.getT1(); |
| 563 | + Payload setupPayload = tuple.getT2(); |
| 564 | + |
| 565 | + ByteBuf resumeToken; |
| 566 | + KeepAliveHandler keepAliveHandler; |
| 567 | + DuplexConnection wrappedConnection; |
| 568 | + |
| 569 | + if (resume != null) { |
| 570 | + resumeToken = resume.getTokenSupplier().get(); |
| 571 | + ClientRSocketSession session = |
| 572 | + new ClientRSocketSession( |
| 573 | + connection, |
| 574 | + resume.getSessionDuration(), |
| 575 | + resume.getRetry(), |
| 576 | + resume.getStoreFactory(CLIENT_TAG).apply(resumeToken), |
| 577 | + resume.getStreamTimeout(), |
| 578 | + resume.isCleanupStoreOnKeepAlive()) |
| 579 | + .continueWith(connectionMono) |
| 580 | + .resumeToken(resumeToken); |
| 581 | + keepAliveHandler = |
| 582 | + new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection()); |
| 583 | + wrappedConnection = session.resumableConnection(); |
| 584 | + } else { |
| 585 | + resumeToken = Unpooled.EMPTY_BUFFER; |
| 586 | + keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection); |
| 587 | + wrappedConnection = connection; |
| 588 | + } |
| 589 | + |
| 590 | + ClientServerInputMultiplexer multiplexer = |
| 591 | + new ClientServerInputMultiplexer(wrappedConnection, interceptors, true); |
| 592 | + |
| 593 | + boolean leaseEnabled = leasesSupplier != null; |
| 594 | + Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null; |
| 595 | + RequesterLeaseHandler requesterLeaseHandler = |
| 596 | + leaseEnabled |
| 597 | + ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) |
| 598 | + : RequesterLeaseHandler.None; |
| 599 | + |
| 600 | + RSocket rSocketRequester = |
| 601 | + new RSocketRequester( |
| 602 | + multiplexer.asClientConnection(), |
| 603 | + payloadDecoder, |
| 604 | + StreamIdSupplier.clientSupplier(), |
| 605 | + mtu, |
| 606 | + (int) keepAliveInterval.toMillis(), |
| 607 | + (int) keepAliveMaxLifeTime.toMillis(), |
| 608 | + keepAliveHandler, |
| 609 | + requesterLeaseHandler, |
| 610 | + Schedulers.single(Schedulers.parallel())); |
| 611 | + |
| 612 | + RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); |
| 613 | + |
| 614 | + ByteBuf setupFrame = |
| 615 | + SetupFrameCodec.encode( |
| 616 | + wrappedConnection.alloc(), |
| 617 | + leaseEnabled, |
| 618 | + (int) keepAliveInterval.toMillis(), |
| 619 | + (int) keepAliveMaxLifeTime.toMillis(), |
| 620 | + resumeToken, |
| 621 | + metadataMimeType, |
| 622 | + dataMimeType, |
| 623 | + setupPayload); |
| 624 | + |
| 625 | + SocketAcceptor acceptor = |
| 626 | + this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {}); |
| 627 | + |
| 628 | + ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame); |
| 629 | + |
| 630 | + return interceptors |
| 631 | + .initSocketAcceptor(acceptor) |
| 632 | + .accept(setup, wrappedRSocketRequester) |
| 633 | + .flatMap( |
| 634 | + rSocketHandler -> { |
| 635 | + RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); |
| 636 | + |
| 637 | + ResponderLeaseHandler responderLeaseHandler = |
| 638 | + leaseEnabled |
| 639 | + ? new ResponderLeaseHandler.Impl<>( |
| 640 | + CLIENT_TAG, |
| 641 | + wrappedConnection.alloc(), |
| 642 | + leases.sender(), |
| 643 | + leases.stats()) |
| 644 | + : ResponderLeaseHandler.None; |
| 645 | + |
| 646 | + RSocket rSocketResponder = |
| 647 | + new RSocketResponder( |
| 648 | + multiplexer.asServerConnection(), |
| 649 | + wrappedRSocketHandler, |
| 650 | + payloadDecoder, |
| 651 | + responderLeaseHandler, |
| 652 | + mtu); |
| 653 | + |
| 654 | + return wrappedConnection |
| 655 | + .sendOne(setupFrame.retain()) |
| 656 | + .thenReturn(wrappedRSocketRequester); |
| 657 | + }) |
| 658 | + .doFinally(signalType -> setup.release()); |
| 659 | + }); |
661 | 660 | }
|
662 | 661 | }
|
0 commit comments