Skip to content

Commit fce1823

Browse files
committed
reduce allocations
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 88b7ca9 commit fce1823

File tree

9 files changed

+117
-202
lines changed

9 files changed

+117
-202
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ private Mono<KeepAliveConnection> newConnection() {
344344
KeepAliveConnection.ofClient(
345345
allocator,
346346
connection,
347-
notUsed ->
348-
Mono.just(new KeepAliveData(keepAliveTickPeriod(), keepAliveTimeout())),
347+
notUsed -> new KeepAliveData(keepAliveTickPeriod(), keepAliveTimeout()),
349348
errorConsumer));
350349
}
351350
}
@@ -454,7 +453,7 @@ public <T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> tra
454453
private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
455454
connection =
456455
KeepAliveConnection.ofServer(
457-
allocator, connection, serverSetup.keepAliveData(), errorConsumer);
456+
allocator, connection, serverSetup::keepAliveData, errorConsumer);
458457
ClientServerInputMultiplexer multiplexer =
459458
new ClientServerInputMultiplexer(connection, plugins);
460459

rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.netty.buffer.Unpooled;
2222
import io.rsocket.DuplexConnection;
2323
import io.rsocket.keepalive.KeepAliveConnection;
24-
import io.rsocket.resume.*;
24+
import io.rsocket.resume.ClientRSocketSession;
25+
import io.rsocket.resume.ResumableFramesStore;
26+
import io.rsocket.resume.ResumeStrategy;
2527
import java.time.Duration;
2628
import java.util.function.Supplier;
2729
import reactor.core.publisher.Mono;
@@ -48,34 +50,41 @@ public ByteBuf resumeToken() {
4850

4951
class ResumableClientSetup implements ClientSetup {
5052
private final ByteBuf resumeToken;
51-
private final ClientResumeConfiguration config;
5253
private final ByteBufAllocator allocator;
53-
private final Mono<KeepAliveConnection> newConnection;
54+
private final Mono<KeepAliveConnection> newConnectionFactory;
55+
private final Duration resumeSessionDuration;
56+
private final Supplier<ResumeStrategy> resumeStrategySupplier;
57+
private final ResumableFramesStore resumableFramesStore;
58+
private final Duration resumeStreamTimeout;
5459

5560
public ResumableClientSetup(
5661
ByteBufAllocator allocator,
57-
Mono<KeepAliveConnection> newConnection,
62+
Mono<KeepAliveConnection> newConnectionFactory,
5863
ByteBuf resumeToken,
5964
ResumableFramesStore resumableFramesStore,
6065
Duration resumeSessionDuration,
6166
Duration resumeStreamTimeout,
6267
Supplier<ResumeStrategy> resumeStrategySupplier) {
6368
this.allocator = allocator;
64-
this.newConnection = newConnection;
69+
this.newConnectionFactory = newConnectionFactory;
6570
this.resumeToken = resumeToken;
66-
this.config =
67-
new ClientResumeConfiguration(
68-
resumeSessionDuration,
69-
resumeStrategySupplier,
70-
resumableFramesStore,
71-
resumeStreamTimeout);
71+
this.resumeSessionDuration = resumeSessionDuration;
72+
this.resumeStrategySupplier = resumeStrategySupplier;
73+
this.resumableFramesStore = resumableFramesStore;
74+
this.resumeStreamTimeout = resumeStreamTimeout;
7275
}
7376

7477
@Override
7578
public DuplexConnection wrappedConnection(KeepAliveConnection connection) {
7679
ClientRSocketSession rSocketSession =
77-
new ClientRSocketSession(allocator, connection, config)
78-
.continueWith(newConnection)
80+
new ClientRSocketSession(
81+
connection,
82+
allocator,
83+
resumeSessionDuration,
84+
resumeStrategySupplier,
85+
resumableFramesStore,
86+
resumeStreamTimeout)
87+
.continueWith(newConnectionFactory)
7988
.resumeToken(resumeToken);
8089

8190
return rSocketSession.resumableConnection();

rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.rsocket.util.ConnectionUtils;
3030
import java.time.Duration;
3131
import java.util.function.Function;
32+
import javax.annotation.Nullable;
3233
import reactor.core.publisher.Mono;
3334

3435
public interface ServerSetup {
@@ -42,7 +43,8 @@ Mono<Void> acceptRSocketSetup(
4243
Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer);
4344

4445
/*get KEEP-ALIVE timings based on start frame: SETUP (directly) /RESUME (lookup by resume token)*/
45-
Function<ByteBuf, Mono<KeepAliveData>> keepAliveData();
46+
@Nullable
47+
KeepAliveData keepAliveData(ByteBuf frame);
4648

4749
default void dispose() {}
4850

@@ -83,17 +85,14 @@ public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexe
8385
}
8486

8587
@Override
86-
public Function<ByteBuf, Mono<KeepAliveData>> keepAliveData() {
87-
return frame -> {
88-
if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) {
89-
return Mono.just(
90-
new KeepAliveData(
91-
SetupFrameFlyweight.keepAliveInterval(frame),
92-
SetupFrameFlyweight.keepAliveMaxLifetime(frame)));
93-
} else {
94-
return Mono.never();
95-
}
96-
};
88+
public KeepAliveData keepAliveData(ByteBuf frame) {
89+
if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) {
90+
return new KeepAliveData(
91+
SetupFrameFlyweight.keepAliveInterval(frame),
92+
SetupFrameFlyweight.keepAliveMaxLifetime(frame));
93+
} else {
94+
return null;
95+
}
9796
}
9897

9998
private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
@@ -104,7 +103,9 @@ private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception
104103
class ResumableServerSetup implements ServerSetup {
105104
private final ByteBufAllocator allocator;
106105
private final SessionManager sessionManager;
107-
private final ServerResumeConfiguration resumeConfig;
106+
private final Duration resumeSessionDuration;
107+
private final Duration resumeStreamTimeout;
108+
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
108109

109110
public ResumableServerSetup(
110111
ByteBufAllocator allocator,
@@ -114,9 +115,9 @@ public ResumableServerSetup(
114115
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory) {
115116
this.allocator = allocator;
116117
this.sessionManager = sessionManager;
117-
this.resumeConfig =
118-
new ServerResumeConfiguration(
119-
resumeSessionDuration, resumeStreamTimeout, resumeStoreFactory);
118+
this.resumeSessionDuration = resumeSessionDuration;
119+
this.resumeStreamTimeout = resumeStreamTimeout;
120+
this.resumeStoreFactory = resumeStoreFactory;
120121
}
121122

122123
@Override
@@ -137,11 +138,13 @@ public Mono<Void> acceptRSocketSetup(
137138
sessionManager
138139
.save(
139140
new ServerRSocketSession(
140-
allocator,
141141
multiplexer.asClientServerConnection(),
142-
resumeConfig,
143-
keepAliveData,
144-
resumeToken))
142+
allocator,
143+
resumeSessionDuration,
144+
resumeStreamTimeout,
145+
resumeStoreFactory,
146+
resumeToken,
147+
keepAliveData))
145148
.resumableConnection();
146149
return then.apply(new ClientServerInputMultiplexer(resumableConnection));
147150
} else {
@@ -151,41 +154,37 @@ public Mono<Void> acceptRSocketSetup(
151154

152155
@Override
153156
public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
154-
return sessionManager
155-
.get(ResumeFrameFlyweight.token(frame))
156-
.map(
157-
session ->
158-
session
159-
.continueWith(multiplexer.asClientServerConnection())
160-
.resumeWith(frame)
161-
.onClose()
162-
.then())
163-
.orElseGet(
164-
() ->
165-
sendError(multiplexer, new RejectedResumeException("unknown resume token"))
166-
.doFinally(
167-
s -> {
168-
frame.release();
169-
multiplexer.dispose();
170-
}));
157+
ServerRSocketSession session = sessionManager.get(ResumeFrameFlyweight.token(frame));
158+
if (session != null) {
159+
return session
160+
.continueWith(multiplexer.asClientServerConnection())
161+
.resumeWith(frame)
162+
.onClose()
163+
.then();
164+
} else {
165+
return sendError(multiplexer, new RejectedResumeException("unknown resume token"))
166+
.doFinally(
167+
s -> {
168+
frame.release();
169+
multiplexer.dispose();
170+
});
171+
}
171172
}
172173

173174
@Override
174-
public Function<ByteBuf, Mono<KeepAliveData>> keepAliveData() {
175-
return frame -> {
176-
if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) {
177-
return Mono.just(
178-
new KeepAliveData(
179-
SetupFrameFlyweight.keepAliveInterval(frame),
180-
SetupFrameFlyweight.keepAliveMaxLifetime(frame)));
175+
public KeepAliveData keepAliveData(ByteBuf frame) {
176+
if (FrameHeaderFlyweight.frameType(frame) == FrameType.SETUP) {
177+
return new KeepAliveData(
178+
SetupFrameFlyweight.keepAliveInterval(frame),
179+
SetupFrameFlyweight.keepAliveMaxLifetime(frame));
180+
} else {
181+
ServerRSocketSession session = sessionManager.get(ResumeFrameFlyweight.token(frame));
182+
if (session != null) {
183+
return session.keepAliveData();
181184
} else {
182-
return sessionManager
183-
.get(ResumeFrameFlyweight.token(frame))
184-
.map(ServerRSocketSession::keepAliveData)
185-
.map(Mono::just)
186-
.orElseGet(Mono::never);
185+
return null;
187186
}
188-
};
187+
}
189188
}
190189

191190
private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.time.Duration;
3131
import java.util.function.Consumer;
3232
import java.util.function.Function;
33+
import javax.annotation.Nullable;
3334
import org.reactivestreams.Publisher;
3435
import reactor.core.publisher.Flux;
3536
import reactor.core.publisher.Mono;
@@ -40,7 +41,7 @@ public class KeepAliveConnection extends DuplexConnectionProxy
4041

4142
private final MonoProcessor<KeepAliveHandler> keepAliveHandlerReady = MonoProcessor.create();
4243
private final ByteBufAllocator allocator;
43-
private final Function<ByteBuf, Mono<KeepAliveData>> keepAliveData;
44+
private final Function<ByteBuf, KeepAliveData> keepAliveData;
4445
private final Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler>
4546
keepAliveHandlerFactory;
4647
private final Consumer<Throwable> errorConsumer;
@@ -50,7 +51,7 @@ public class KeepAliveConnection extends DuplexConnectionProxy
5051
public static KeepAliveConnection ofClient(
5152
ByteBufAllocator allocator,
5253
DuplexConnection duplexConnection,
53-
Function<ByteBuf, Mono<KeepAliveData>> keepAliveData,
54+
Function<ByteBuf, KeepAliveData> keepAliveData,
5455
Consumer<Throwable> errorConsumer) {
5556

5657
return new KeepAliveConnection(
@@ -60,7 +61,7 @@ public static KeepAliveConnection ofClient(
6061
public static KeepAliveConnection ofServer(
6162
ByteBufAllocator allocator,
6263
DuplexConnection duplexConnection,
63-
Function<ByteBuf, Mono<KeepAliveData>> keepAliveData,
64+
Function<ByteBuf, KeepAliveData> keepAliveData,
6465
Consumer<Throwable> errorConsumer) {
6566

6667
return new KeepAliveConnection(
@@ -70,7 +71,7 @@ public static KeepAliveConnection ofServer(
7071
private KeepAliveConnection(
7172
ByteBufAllocator allocator,
7273
DuplexConnection duplexConnection,
73-
Function<ByteBuf, Mono<KeepAliveData>> keepAliveData,
74+
Function<ByteBuf, KeepAliveData> keepAliveData,
7475
Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler> keepAliveHandlerFactory,
7576
Consumer<Throwable> errorConsumer) {
7677
super(duplexConnection);
@@ -106,7 +107,7 @@ public Mono<Void> send(Publisher<ByteBuf> frames) {
106107
.doOnNext(
107108
f -> {
108109
if (isStartFrame(f)) {
109-
keepAliveHandler(keepAliveData.apply(f)).subscribe(keepAliveHandlerReady);
110+
startKeepAliveHandler(keepAliveData.apply(f));
110111
}
111112
}));
112113
}
@@ -122,7 +123,7 @@ public Flux<ByteBuf> receive() {
122123
resumeStateHolder.onImpliedPosition(receivedPos);
123124
}
124125
} else if (isStartFrame(f)) {
125-
keepAliveHandler(keepAliveData.apply(f)).subscribe(keepAliveHandlerReady);
126+
startKeepAliveHandler(keepAliveData.apply(f));
126127
}
127128
});
128129
}
@@ -158,8 +159,11 @@ private static boolean isKeepAliveFrame(ByteBuf frame) {
158159
return FrameHeaderFlyweight.frameType(frame) == FrameType.KEEPALIVE;
159160
}
160161

161-
private Mono<KeepAliveHandler> keepAliveHandler(Mono<KeepAliveData> keepAliveData) {
162-
return keepAliveData.map(
163-
kad -> keepAliveHandlerFactory.apply(allocator, kad.getTickPeriod(), kad.getTimeout()));
162+
private void startKeepAliveHandler(@Nullable KeepAliveData kad) {
163+
if (kad != null) {
164+
KeepAliveHandler handler =
165+
keepAliveHandlerFactory.apply(allocator, kad.getTickPeriod(), kad.getTimeout());
166+
keepAliveHandlerReady.onNext(handler);
167+
}
164168
}
165169
}

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import io.rsocket.frame.ResumeFrameFlyweight;
2525
import io.rsocket.frame.ResumeOkFrameFlyweight;
2626
import io.rsocket.internal.ClientServerInputMultiplexer;
27-
import java.util.Objects;
27+
import java.time.Duration;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.function.Supplier;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132
import reactor.core.publisher.Mono;
@@ -40,13 +41,16 @@ public class ClientRSocketSession
4041
private final ByteBufAllocator allocator;
4142

4243
public ClientRSocketSession(
43-
ByteBufAllocator allocator,
4444
ResumePositionsConnection duplexConnection,
45-
ClientResumeConfiguration config) {
46-
this.allocator = Objects.requireNonNull(allocator);
45+
ByteBufAllocator allocator,
46+
Duration resumeSessionDuration,
47+
Supplier<ResumeStrategy> resumeStrategy,
48+
ResumableFramesStore resumableFramesStore,
49+
Duration resumeStreamTimeout) {
50+
this.allocator = allocator;
4751
this.resumableConnection =
4852
new ResumableDuplexConnection(
49-
"client", duplexConnection, config.resumeStore(), config.resumeStreamTimeout());
53+
"client", duplexConnection, resumableFramesStore, resumeStreamTimeout);
5054

5155
/*session completed: release token initially retained in resumeToken(ByteBuf)*/
5256
onClose().doFinally(s -> resumeToken.release()).subscribe();
@@ -56,8 +60,8 @@ public ClientRSocketSession(
5660
.flatMap(
5761
err -> {
5862
logger.debug("Client session connection error. Starting new connection");
59-
ResumeStrategy reconnectOnError = config.resumptionStrategy().get();
60-
ClientResume clientResume = new ClientResume(config.sessionDuration(), resumeToken);
63+
ResumeStrategy reconnectOnError = resumeStrategy.get();
64+
ClientResume clientResume = new ClientResume(resumeSessionDuration, resumeToken);
6165
AtomicBoolean once = new AtomicBoolean();
6266
return newConnection
6367
.delaySubscription(
@@ -74,7 +78,7 @@ public ClientRSocketSession(
7478
retryErr ->
7579
Mono.from(reconnectOnError.apply(clientResume, retryErr))
7680
.doOnNext(v -> logger.debug("Retrying with: {}", v))))
77-
.timeout(config.sessionDuration());
81+
.timeout(resumeSessionDuration);
7882
})
7983
.map(ClientServerInputMultiplexer::new)
8084
.subscribe(

0 commit comments

Comments
 (0)