Skip to content

Commit b3087ef

Browse files
authored
Refactors how ByteBufAllocator is used and setup (#796)
1 parent 3a5a4c2 commit b3087ef

File tree

41 files changed

+305
-212
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+305
-212
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.rsocket;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
2021
import java.nio.channels.ClosedChannelException;
2122
import org.reactivestreams.Publisher;
2223
import org.reactivestreams.Subscriber;
@@ -78,6 +79,13 @@ default Mono<Void> sendOne(ByteBuf frame) {
7879
*/
7980
Flux<ByteBuf> receive();
8081

82+
/**
83+
* Returns the assigned {@link ByteBufAllocator}.
84+
*
85+
* @return the {@link ByteBufAllocator}
86+
*/
87+
ByteBufAllocator alloc();
88+
8189
@Override
8290
default double availability() {
8391
return isDisposed() ? 0.0 : 1.0;

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,29 @@ public ClientRSocketFactory(RSocketConnector connector) {
113113
this.connector = connector;
114114
}
115115

116+
/**
117+
* @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in
118+
* order configure the custom {@link ByteBufAllocator} it is recommended to use the
119+
* following setup for Reactor Netty based transport: <br>
120+
* 1. For Client: <br>
121+
* <pre>{@code
122+
* TcpClient.create()
123+
* ...
124+
* .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
125+
* }</pre>
126+
* <br>
127+
* 2. For server: <br>
128+
* <pre>{@code
129+
* TcpServer.create()
130+
* ...
131+
* .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
132+
* }</pre>
133+
* Or in case of local transport, to use corresponding factory method {@code
134+
* LocalClientTransport.creat(String, ByteBufAllocator)}
135+
* @param allocator instance of {@link ByteBufAllocator}
136+
* @return this factory instance
137+
*/
116138
public ClientRSocketFactory byteBufAllocator(ByteBufAllocator allocator) {
117-
connector.byteBufAllocator(allocator);
118139
return this;
119140
}
120141

@@ -395,8 +416,30 @@ public ServerRSocketFactory(RSocketServer server) {
395416
this.server = server;
396417
}
397418

419+
/**
420+
* @deprecated this method is deprecated and deliberately has no effect anymore. Right now, in
421+
* order configure the custom {@link ByteBufAllocator} it is recommended to use the
422+
* following setup for Reactor Netty based transport: <br>
423+
* 1. For Client: <br>
424+
* <pre>{@code
425+
* TcpClient.create()
426+
* ...
427+
* .bootstrap(bootstrap -> bootstrap.option(ChannelOption.ALLOCATOR, clientAllocator))
428+
* }</pre>
429+
* <br>
430+
* 2. For server: <br>
431+
* <pre>{@code
432+
* TcpServer.create()
433+
* ...
434+
* .bootstrap(serverBootstrap -> serverBootstrap.childOption(ChannelOption.ALLOCATOR, serverAllocator))
435+
* }</pre>
436+
* Or in case of local transport, to use corresponding factory method {@code
437+
* LocalClientTransport.creat(String, ByteBufAllocator)}
438+
* @param allocator instance of {@link ByteBufAllocator}
439+
* @return this factory instance
440+
*/
441+
@Deprecated
398442
public ServerRSocketFactory byteBufAllocator(ByteBufAllocator allocator) {
399-
server.byteBufAllocator(allocator);
400443
return this;
401444
}
402445

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.rsocket.core;
1717

1818
import io.netty.buffer.ByteBuf;
19-
import io.netty.buffer.ByteBufAllocator;
2019
import io.netty.buffer.Unpooled;
2120
import io.rsocket.AbstractRSocket;
2221
import io.rsocket.ConnectionSetupPayload;
@@ -71,7 +70,6 @@ public class RSocketConnector {
7170
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
7271

7372
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
74-
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
7573

7674
private RSocketConnector() {}
7775

@@ -241,16 +239,6 @@ public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
241239
return this;
242240
}
243241

244-
/**
245-
* @deprecated this is deprecated with no replacement and will be removed after {@link
246-
* io.rsocket.RSocketFactory} is removed.
247-
*/
248-
public RSocketConnector byteBufAllocator(ByteBufAllocator allocator) {
249-
Objects.requireNonNull(allocator);
250-
this.allocator = allocator;
251-
return this;
252-
}
253-
254242
public Mono<RSocket> connect(ClientTransport transport) {
255243
return connect(() -> transport);
256244
}
@@ -289,7 +277,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
289277

290278
RSocket rSocketRequester =
291279
new RSocketRequester(
292-
allocator,
293280
multiplexer.asClientConnection(),
294281
payloadDecoder,
295282
errorConsumer,
@@ -304,7 +291,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
304291

305292
ByteBuf setupFrame =
306293
SetupFrameFlyweight.encode(
307-
allocator,
294+
wrappedConnection.alloc(),
308295
leaseEnabled,
309296
(int) keepAliveInterval.toMillis(),
310297
(int) keepAliveMaxLifeTime.toMillis(),
@@ -326,15 +313,14 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
326313
leaseEnabled
327314
? new ResponderLeaseHandler.Impl<>(
328315
CLIENT_TAG,
329-
allocator,
316+
wrappedConnection.alloc(),
330317
leases.sender(),
331318
errorConsumer,
332319
leases.stats())
333320
: ResponderLeaseHandler.None;
334321

335322
RSocket rSocketResponder =
336323
new RSocketResponder(
337-
allocator,
338324
multiplexer.asServerConnection(),
339325
wrappedRSocketHandler,
340326
payloadDecoder,
@@ -364,7 +350,6 @@ private ClientRSocketSession createSession(
364350
ClientRSocketSession session =
365351
new ClientRSocketSession(
366352
connection,
367-
allocator,
368353
resume.getSessionDuration(),
369354
resume.getResumeStrategySupplier(),
370355
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class RSocketRequester implements RSocket {
108108
private volatile Throwable terminationError;
109109

110110
RSocketRequester(
111-
ByteBufAllocator allocator,
112111
DuplexConnection connection,
113112
PayloadDecoder payloadDecoder,
114113
Consumer<Throwable> errorConsumer,
@@ -118,8 +117,8 @@ class RSocketRequester implements RSocket {
118117
int keepAliveAckTimeout,
119118
@Nullable KeepAliveHandler keepAliveHandler,
120119
RequesterLeaseHandler leaseHandler) {
121-
this.allocator = allocator;
122120
this.connection = connection;
121+
this.allocator = connection.alloc();
123122
this.payloadDecoder = payloadDecoder;
124123
this.errorConsumer = errorConsumer;
125124
this.streamIdSupplier = streamIdSupplier;
@@ -141,7 +140,7 @@ class RSocketRequester implements RSocket {
141140

142141
if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
143142
KeepAliveSupport keepAliveSupport =
144-
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
143+
new ClientKeepAliveSupport(this.allocator, keepAliveTickPeriod, keepAliveAckTimeout);
145144
this.keepAliveFramesAcceptor =
146145
keepAliveHandler.start(
147146
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,14 @@ class RSocketResponder implements ResponderRSocket {
7575
private final ByteBufAllocator allocator;
7676

7777
RSocketResponder(
78-
ByteBufAllocator allocator,
7978
DuplexConnection connection,
8079
RSocket requestHandler,
8180
PayloadDecoder payloadDecoder,
8281
Consumer<Throwable> errorConsumer,
8382
ResponderLeaseHandler leaseHandler,
8483
int mtu) {
85-
this.allocator = allocator;
8684
this.connection = connection;
85+
this.allocator = connection.alloc();
8786
this.mtu = mtu;
8887

8988
this.requestHandler = requestHandler;

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.rsocket.core;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.netty.buffer.ByteBufAllocator;
2120
import io.rsocket.AbstractRSocket;
2221
import io.rsocket.Closeable;
2322
import io.rsocket.ConnectionSetupPayload;
@@ -55,7 +54,6 @@ public final class RSocketServer {
5554

5655
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
5756
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
58-
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
5957

6058
private RSocketServer() {}
6159

@@ -114,17 +112,6 @@ public RSocketServer errorConsumer(Consumer<Throwable> errorConsumer) {
114112
return this;
115113
}
116114

117-
/**
118-
* @deprecated this is deprecated with no replacement and will be removed after {@link
119-
* io.rsocket.RSocketFactory} is removed.
120-
*/
121-
@Deprecated
122-
public RSocketServer byteBufAllocator(ByteBufAllocator allocator) {
123-
Objects.requireNonNull(allocator);
124-
this.allocator = allocator;
125-
return this;
126-
}
127-
128115
public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
129116
return new ServerTransport.ConnectionAcceptor() {
130117
private final ServerSetup serverSetup = serverSetup();
@@ -228,7 +215,6 @@ private Mono<Void> acceptSetup(
228215

229216
RSocket rSocketRequester =
230217
new RSocketRequester(
231-
allocator,
232218
wrappedMultiplexer.asServerConnection(),
233219
payloadDecoder,
234220
errorConsumer,
@@ -252,21 +238,21 @@ private Mono<Void> acceptSetup(
252238
.doOnNext(
253239
rSocketHandler -> {
254240
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
241+
DuplexConnection connection = wrappedMultiplexer.asClientConnection();
255242

256243
ResponderLeaseHandler responderLeaseHandler =
257244
leaseEnabled
258245
? new ResponderLeaseHandler.Impl<>(
259246
SERVER_TAG,
260-
allocator,
247+
connection.alloc(),
261248
leases.sender(),
262249
errorConsumer,
263250
leases.stats())
264251
: ResponderLeaseHandler.None;
265252

266253
RSocket rSocketResponder =
267254
new RSocketResponder(
268-
allocator,
269-
wrappedMultiplexer.asClientConnection(),
255+
connection,
270256
wrappedRSocketHandler,
271257
payloadDecoder,
272258
errorConsumer,
@@ -279,12 +265,11 @@ private Mono<Void> acceptSetup(
279265
}
280266

281267
private ServerSetup serverSetup() {
282-
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup(allocator);
268+
return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
283269
}
284270

285271
ServerSetup createSetup() {
286272
return new ServerSetup.ResumableServerSetup(
287-
allocator,
288273
new SessionManager(),
289274
resume.getSessionDuration(),
290275
resume.getStreamTimeout(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static io.rsocket.keepalive.KeepAliveHandler.*;
2020

2121
import io.netty.buffer.ByteBuf;
22-
import io.netty.buffer.ByteBufAllocator;
22+
import io.rsocket.DuplexConnection;
2323
import io.rsocket.exceptions.RejectedResumeException;
2424
import io.rsocket.exceptions.UnsupportedSetupException;
2525
import io.rsocket.frame.ErrorFrameFlyweight;
@@ -35,12 +35,6 @@
3535

3636
abstract class ServerSetup {
3737

38-
final ByteBufAllocator allocator;
39-
40-
public ServerSetup(ByteBufAllocator allocator) {
41-
this.allocator = allocator;
42-
}
43-
4438
abstract Mono<Void> acceptRSocketSetup(
4539
ByteBuf frame,
4640
ClientServerInputMultiplexer multiplexer,
@@ -51,18 +45,14 @@ abstract Mono<Void> acceptRSocketSetup(
5145
void dispose() {}
5246

5347
Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
54-
return multiplexer
55-
.asSetupConnection()
56-
.sendOne(ErrorFrameFlyweight.encode(allocator, 0, exception))
48+
DuplexConnection duplexConnection = multiplexer.asSetupConnection();
49+
return duplexConnection
50+
.sendOne(ErrorFrameFlyweight.encode(duplexConnection.alloc(), 0, exception))
5751
.onErrorResume(err -> Mono.empty());
5852
}
5953

6054
static class DefaultServerSetup extends ServerSetup {
6155

62-
DefaultServerSetup(ByteBufAllocator allocator) {
63-
super(allocator);
64-
}
65-
6656
@Override
6757
public Mono<Void> acceptRSocketSetup(
6858
ByteBuf frame,
@@ -94,22 +84,18 @@ public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe
9484
}
9585

9686
static class ResumableServerSetup extends ServerSetup {
97-
private final ByteBufAllocator allocator;
9887
private final SessionManager sessionManager;
9988
private final Duration resumeSessionDuration;
10089
private final Duration resumeStreamTimeout;
10190
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
10291
private final boolean cleanupStoreOnKeepAlive;
10392

10493
ResumableServerSetup(
105-
ByteBufAllocator allocator,
10694
SessionManager sessionManager,
10795
Duration resumeSessionDuration,
10896
Duration resumeStreamTimeout,
10997
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
11098
boolean cleanupStoreOnKeepAlive) {
111-
super(allocator);
112-
this.allocator = allocator;
11399
this.sessionManager = sessionManager;
114100
this.resumeSessionDuration = resumeSessionDuration;
115101
this.resumeStreamTimeout = resumeStreamTimeout;
@@ -131,7 +117,6 @@ public Mono<Void> acceptRSocketSetup(
131117
.save(
132118
new ServerRSocketSession(
133119
multiplexer.asClientServerConnection(),
134-
allocator,
135120
resumeSessionDuration,
136121
resumeStreamTimeout,
137122
resumeStoreFactory,

0 commit comments

Comments
 (0)