31
31
import io .rsocket .internal .ClientSetup ;
32
32
import io .rsocket .internal .ServerSetup ;
33
33
import io .rsocket .keepalive .KeepAliveHandler ;
34
+ import io .rsocket .lease .*;
34
35
import io .rsocket .plugins .DuplexConnectionInterceptor ;
35
36
import io .rsocket .plugins .PluginRegistry ;
36
37
import io .rsocket .plugins .Plugins ;
40
41
import io .rsocket .transport .ServerTransport ;
41
42
import io .rsocket .util .ConnectionUtils ;
42
43
import io .rsocket .util .EmptyPayload ;
44
+ import io .rsocket .util .MultiSubscriberRSocket ;
43
45
import java .time .Duration ;
44
46
import java .util .Objects ;
45
- import java .util .function .BiFunction ;
46
- import java .util .function .Consumer ;
47
- import java .util .function .Function ;
48
- import java .util .function .Supplier ;
47
+ import java .util .function .*;
49
48
import reactor .core .publisher .Mono ;
50
49
51
50
/** Factory for creating RSocket clients and servers. */
@@ -92,6 +91,8 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
92
91
}
93
92
94
93
public static class ClientRSocketFactory implements ClientTransportAcceptor {
94
+ private static final String CLIENT_TAG = "client" ;
95
+
95
96
private Supplier <Function <RSocket , RSocket >> acceptor =
96
97
() -> rSocket -> new AbstractRSocket () {};
97
98
@@ -115,13 +116,17 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
115
116
private boolean resumeCleanupStoreOnKeepAlive ;
116
117
private Supplier <ByteBuf > resumeTokenSupplier = ResumeFrameFlyweight ::generateResumeToken ;
117
118
private Function <? super ByteBuf , ? extends ResumableFramesStore > resumeStoreFactory =
118
- token -> new InMemoryResumableFramesStore ("client" , 100_000 );
119
+ token -> new InMemoryResumableFramesStore (CLIENT_TAG , 100_000 );
119
120
private Duration resumeSessionDuration = Duration .ofMinutes (2 );
120
121
private Duration resumeStreamTimeout = Duration .ofSeconds (10 );
121
122
private Supplier <ResumeStrategy > resumeStrategySupplier =
122
123
() ->
123
124
new ExponentialBackoffResumeStrategy (Duration .ofSeconds (1 ), Duration .ofSeconds (16 ), 2 );
124
125
126
+ private boolean multiSubscriberRequester = true ;
127
+ private boolean leaseEnabled ;
128
+ private Supplier <Leases <?>> leasesSupplier = Leases ::new ;
129
+
125
130
private ByteBufAllocator allocator = ByteBufAllocator .DEFAULT ;
126
131
127
132
public ClientRSocketFactory byteBufAllocator (ByteBufAllocator allocator ) {
@@ -205,6 +210,22 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
205
210
return this ;
206
211
}
207
212
213
+ public ClientRSocketFactory lease (Supplier <Leases <? extends LeaseStats >> leasesSupplier ) {
214
+ this .leaseEnabled = true ;
215
+ this .leasesSupplier = Objects .requireNonNull (leasesSupplier );
216
+ return this ;
217
+ }
218
+
219
+ public ClientRSocketFactory lease () {
220
+ this .leaseEnabled = true ;
221
+ return this ;
222
+ }
223
+
224
+ public ClientRSocketFactory singleSubscriberRequester () {
225
+ this .multiSubscriberRequester = false ;
226
+ return this ;
227
+ }
228
+
208
229
public ClientRSocketFactory resume () {
209
230
this .resumeEnabled = true ;
210
231
return this ;
@@ -300,9 +321,16 @@ public Mono<RSocket> start() {
300
321
DuplexConnection wrappedConnection = clientSetup .connection ();
301
322
302
323
ClientServerInputMultiplexer multiplexer =
303
- new ClientServerInputMultiplexer (wrappedConnection , plugins );
324
+ new ClientServerInputMultiplexer (wrappedConnection , plugins , true );
325
+
326
+ boolean isLeaseEnabled = leaseEnabled ;
327
+ Leases <?> leases = leasesSupplier .get ();
328
+ RequesterLeaseHandler requesterLeaseHandler =
329
+ isLeaseEnabled
330
+ ? new RequesterLeaseHandler .Impl (CLIENT_TAG , leases .receiver ())
331
+ : RequesterLeaseHandler .None ;
304
332
305
- RSocketRequester rSocketRequester =
333
+ RSocket rSocketRequester =
306
334
new RSocketRequester (
307
335
allocator ,
308
336
multiplexer .asClientConnection (),
@@ -311,12 +339,17 @@ public Mono<RSocket> start() {
311
339
StreamIdSupplier .clientSupplier (),
312
340
keepAliveTickPeriod (),
313
341
keepAliveTimeout (),
314
- keepAliveHandler );
342
+ keepAliveHandler ,
343
+ requesterLeaseHandler );
344
+
345
+ if (multiSubscriberRequester ) {
346
+ rSocketRequester = new MultiSubscriberRSocket (rSocketRequester );
347
+ }
315
348
316
349
ByteBuf setupFrame =
317
350
SetupFrameFlyweight .encode (
318
351
allocator ,
319
- false ,
352
+ isLeaseEnabled ,
320
353
keepAliveTickPeriod (),
321
354
keepAliveTimeout (),
322
355
resumeToken ,
@@ -337,13 +370,20 @@ public Mono<RSocket> start() {
337
370
338
371
RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
339
372
340
- RSocketResponder rSocketResponder =
373
+ ResponderLeaseHandler responderLeaseHandler =
374
+ isLeaseEnabled
375
+ ? new ResponderLeaseHandler .Impl <>(
376
+ CLIENT_TAG , allocator , leases .sender (), errorConsumer , leases .stats ())
377
+ : ResponderLeaseHandler .None ;
378
+
379
+ RSocket rSocketResponder =
341
380
new RSocketResponder (
342
381
allocator ,
343
382
multiplexer .asServerConnection (),
344
383
wrappedRSocketHandler ,
345
384
payloadDecoder ,
346
- errorConsumer );
385
+ errorConsumer ,
386
+ responderLeaseHandler );
347
387
348
388
return wrappedConnection .sendOne (setupFrame ).thenReturn (wrappedRSocketRequester );
349
389
});
@@ -382,16 +422,23 @@ private Mono<DuplexConnection> newConnection() {
382
422
}
383
423
384
424
public static class ServerRSocketFactory {
425
+ private static final String SERVER_TAG = "server" ;
426
+
385
427
private SocketAcceptor acceptor ;
386
428
private PayloadDecoder payloadDecoder = PayloadDecoder .DEFAULT ;
387
429
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
388
430
private int mtu = 0 ;
389
431
private PluginRegistry plugins = new PluginRegistry (Plugins .defaultPlugins ());
432
+
390
433
private boolean resumeSupported ;
391
434
private Duration resumeSessionDuration = Duration .ofSeconds (120 );
392
435
private Duration resumeStreamTimeout = Duration .ofSeconds (10 );
393
436
private Function <? super ByteBuf , ? extends ResumableFramesStore > resumeStoreFactory =
394
- token -> new InMemoryResumableFramesStore ("server" , 100_000 );
437
+ token -> new InMemoryResumableFramesStore (SERVER_TAG , 100_000 );
438
+
439
+ private boolean multiSubscriberRequester = true ;
440
+ private boolean leaseEnabled ;
441
+ private Supplier <Leases <?>> leasesSupplier = Leases ::new ;
395
442
396
443
private ByteBufAllocator allocator = ByteBufAllocator .DEFAULT ;
397
444
private boolean resumeCleanupStoreOnKeepAlive ;
@@ -450,6 +497,22 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
450
497
return this ;
451
498
}
452
499
500
+ public ServerRSocketFactory lease (Supplier <Leases <?>> leasesSupplier ) {
501
+ this .leaseEnabled = true ;
502
+ this .leasesSupplier = Objects .requireNonNull (leasesSupplier );
503
+ return this ;
504
+ }
505
+
506
+ public ServerRSocketFactory lease () {
507
+ this .leaseEnabled = true ;
508
+ return this ;
509
+ }
510
+
511
+ public ServerRSocketFactory singleSubscriberRequester () {
512
+ this .multiSubscriberRequester = false ;
513
+ return this ;
514
+ }
515
+
453
516
public ServerRSocketFactory resume () {
454
517
this .resumeSupported = true ;
455
518
return this ;
@@ -500,7 +563,7 @@ public <T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> tra
500
563
501
564
private Mono <Void > acceptor (ServerSetup serverSetup , DuplexConnection connection ) {
502
565
ClientServerInputMultiplexer multiplexer =
503
- new ClientServerInputMultiplexer (connection , plugins );
566
+ new ClientServerInputMultiplexer (connection , plugins , false );
504
567
505
568
return multiplexer
506
569
.asSetupConnection ()
@@ -541,20 +604,45 @@ private Mono<Void> acceptSetup(
541
604
multiplexer .dispose ();
542
605
});
543
606
}
607
+
608
+ boolean isLeaseEnabled = leaseEnabled ;
609
+
610
+ if (SetupFrameFlyweight .honorLease (setupFrame ) && !isLeaseEnabled ) {
611
+ return sendError (multiplexer , new InvalidSetupException ("lease is not supported" ))
612
+ .doFinally (
613
+ signalType -> {
614
+ setupFrame .release ();
615
+ multiplexer .dispose ();
616
+ });
617
+ }
618
+
544
619
return serverSetup .acceptRSocketSetup (
545
620
setupFrame ,
546
621
multiplexer ,
547
622
(keepAliveHandler , wrappedMultiplexer ) -> {
548
623
ConnectionSetupPayload setupPayload = ConnectionSetupPayload .create (setupFrame );
549
624
550
- RSocketRequester rSocketRequester =
625
+ Leases <?> leases = leasesSupplier .get ();
626
+ RequesterLeaseHandler requesterLeaseHandler =
627
+ isLeaseEnabled
628
+ ? new RequesterLeaseHandler .Impl (SERVER_TAG , leases .receiver ())
629
+ : RequesterLeaseHandler .None ;
630
+
631
+ RSocket rSocketRequester =
551
632
new RSocketRequester (
552
633
allocator ,
553
634
wrappedMultiplexer .asServerConnection (),
554
635
payloadDecoder ,
555
636
errorConsumer ,
556
- StreamIdSupplier .serverSupplier ());
557
-
637
+ StreamIdSupplier .serverSupplier (),
638
+ setupPayload .keepAliveInterval (),
639
+ setupPayload .keepAliveMaxLifetime (),
640
+ keepAliveHandler ,
641
+ requesterLeaseHandler );
642
+
643
+ if (multiSubscriberRequester ) {
644
+ rSocketRequester = new MultiSubscriberRSocket (rSocketRequester );
645
+ }
558
646
RSocket wrappedRSocketRequester = plugins .applyRequester (rSocketRequester );
559
647
560
648
return acceptor
@@ -565,16 +653,24 @@ private Mono<Void> acceptSetup(
565
653
rSocketHandler -> {
566
654
RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
567
655
568
- RSocketResponder rSocketResponder =
656
+ ResponderLeaseHandler responderLeaseHandler =
657
+ isLeaseEnabled
658
+ ? new ResponderLeaseHandler .Impl <>(
659
+ SERVER_TAG ,
660
+ allocator ,
661
+ leases .sender (),
662
+ errorConsumer ,
663
+ leases .stats ())
664
+ : ResponderLeaseHandler .None ;
665
+
666
+ RSocket rSocketResponder =
569
667
new RSocketResponder (
570
668
allocator ,
571
669
wrappedMultiplexer .asClientConnection (),
572
670
wrappedRSocketHandler ,
573
671
payloadDecoder ,
574
672
errorConsumer ,
575
- setupPayload .keepAliveInterval (),
576
- setupPayload .keepAliveMaxLifetime (),
577
- keepAliveHandler );
673
+ responderLeaseHandler );
578
674
})
579
675
.doFinally (signalType -> setupPayload .release ())
580
676
.then ();
0 commit comments