Skip to content

Commit 8a15df0

Browse files
committed
remove ResumptionState to reduce allocations
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent ae771f8 commit 8a15df0

File tree

6 files changed

+112
-75
lines changed

6 files changed

+112
-75
lines changed

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,20 @@ public ClientRSocketSession(
8181
multiplexer -> {
8282
/*reconnect resumable connection*/
8383
reconnect(multiplexer.asClientServerConnection());
84-
85-
ResumptionState state = resumableConnection.state();
84+
long impliedPosition = resumableConnection.impliedPosition();
85+
long position = resumableConnection.position();
8686
logger.debug(
87-
"Client ResumableConnection reconnected. Sending RESUME frame with state: {}",
88-
state);
87+
"Client ResumableConnection reconnected. Sending RESUME frame with state: [impliedPos: {}, pos: {}]",
88+
impliedPosition,
89+
position);
8990
/*Connection is established again: send RESUME frame to server, listen for RESUME_OK*/
9091
sendFrame(
9192
ResumeFrameFlyweight.encode(
9293
allocator,
9394
/*retain so token is not released once sent as part of resume frame*/
9495
resumeToken.retain(),
95-
state.impliedPosition(),
96-
state.position()))
96+
impliedPosition,
97+
position))
9798
.then(multiplexer.asSetupConnection().receive().next())
9899
.subscribe(this::resumeWith);
99100
},
@@ -112,19 +113,21 @@ public ClientRSocketSession continueWith(Mono<? extends ResumeAwareConnection> n
112113
@Override
113114
public ClientRSocketSession resumeWith(ByteBuf resumeOkFrame) {
114115
logger.debug("ResumeOK FRAME received");
115-
ResumptionState resumptionState = stateFromFrame(resumeOkFrame);
116+
long remotePos = remotePos(resumeOkFrame);
117+
long remoteImpliedPos = remoteImpliedPos(resumeOkFrame);
118+
resumeOkFrame.release();
119+
116120
resumableConnection.resume(
117-
resumptionState,
121+
remotePos,
122+
remoteImpliedPos,
118123
pos ->
119124
pos.then()
120125
/*Resumption is impossible: send CONNECTION_ERROR*/
121126
.onErrorResume(
122127
err ->
123128
sendFrame(
124129
ErrorFrameFlyweight.encode(
125-
allocator,
126-
0,
127-
errorFrameThrowable(resumptionState.impliedPosition())))
130+
allocator, 0, errorFrameThrowable(remoteImpliedPos)))
128131
.then(Mono.fromRunnable(resumableConnection::dispose))
129132
/*Resumption is impossible: no need to return control to ResumableConnection*/
130133
.then(Mono.never())));
@@ -156,10 +159,12 @@ private Mono<Void> sendFrame(ByteBuf frame) {
156159
return resumableConnection.sendOne(frame).onErrorResume(err -> Mono.empty());
157160
}
158161

159-
private static ResumptionState stateFromFrame(ByteBuf resumeOkFrame) {
160-
long impliedPos = ResumeOkFrameFlyweight.lastReceivedClientPos(resumeOkFrame);
161-
resumeOkFrame.release();
162-
return ResumptionState.fromServer(impliedPos);
162+
private static long remoteImpliedPos(ByteBuf resumeOkFrame) {
163+
return ResumeOkFrameFlyweight.lastReceivedClientPos(resumeOkFrame);
164+
}
165+
166+
private static long remotePos(ByteBuf resumeOkFrame) {
167+
return -1;
163168
}
164169

165170
private static ConnectionErrorException errorFrameThrowable(long impliedPos) {

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ public void reconnect(ResumeAwareConnection connection) {
119119
/*after receiving RESUME (Server) or RESUME_OK (Client)
120120
calculate and send resume frames */
121121
public void resume(
122-
ResumptionState peerResumptionState, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
122+
long remotePos, long remoteImpliedPos, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
123123
/*race between sendFrame and doResume may lead to duplicate frames on resume store*/
124-
actions.onNext(new Resume(peerResumptionState, resumeFrameSent));
124+
actions.onNext(new Resume(remotePos, remoteImpliedPos, resumeFrameSent));
125125
}
126126

127127
@Override
@@ -152,6 +152,10 @@ public Flux<ByteBuf> receive() {
152152
.onErrorResume(err -> Mono.never()));
153153
}
154154

155+
public long position() {
156+
return resumableFramesStore.framePosition();
157+
}
158+
155159
@Override
156160
public long impliedPosition() {
157161
return resumableFramesStore.frameImpliedPosition();
@@ -209,11 +213,6 @@ private void sendFrame(ByteBuf f) {
209213
}
210214
}
211215

212-
ResumptionState state() {
213-
return new ResumptionState(
214-
resumableFramesStore.framePosition(), resumableFramesStore.frameImpliedPosition());
215-
}
216-
217216
Flux<Throwable> connectionErrors() {
218217
return connectionErrors;
219218
}
@@ -237,20 +236,30 @@ private void doResumeStart(ResumeAwareConnection connection) {
237236
}
238237

239238
private void doResume(
240-
ResumptionState peerResumptionState, Function<Mono<Long>, Mono<Void>> sendResumeFrame) {
241-
ResumptionState localResumptionState = state();
239+
long remotePosition,
240+
long remoteImpliedPosition,
241+
Function<Mono<Long>, Mono<Void>> sendResumeFrame) {
242+
long localPosition = position();
243+
long localImpliedPosition = impliedPosition();
242244

243245
logger.debug(
244246
"Resumption start. Calculating implied pos using: {}",
245247
resumedFramesCalculator.getClass().getSimpleName());
246248
logger.debug(
247-
"Resumption states. local: {}, remote: {}", localResumptionState, peerResumptionState);
248-
249-
Mono<Long> res = resumedFramesCalculator.calculate(localResumptionState, peerResumptionState);
249+
"Resumption states. local: [pos: {}, impliedPos: {}], remote: [pos: {}, impliedPos: {}]",
250+
localPosition,
251+
localImpliedPosition,
252+
remotePosition,
253+
remoteImpliedPosition);
254+
255+
Mono<Long> res =
256+
resumedFramesCalculator.calculate(
257+
localPosition, localImpliedPosition,
258+
remotePosition, remoteImpliedPosition);
250259
Mono<Long> localImpliedPos =
251260
res.doOnSuccess(notUsed -> state = State.RESUME)
252261
.doOnSuccess(this::releaseFramesToPosition)
253-
.map(remoteImpliedPos -> localResumptionState.impliedPosition());
262+
.map(remoteImpliedPos -> localImpliedPosition);
254263

255264
sendResumeFrame
256265
.apply(localImpliedPos)
@@ -355,18 +364,20 @@ public void run() {
355364
}
356365

357366
class Resume implements Runnable {
358-
private final ResumptionState peerResumptionState;
367+
private final long remotePos;
368+
private final long remoteImpliedPos;
359369
private final Function<Mono<Long>, Mono<Void>> resumeFrameSent;
360370

361371
public Resume(
362-
ResumptionState peerResumptionState, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
363-
this.peerResumptionState = peerResumptionState;
372+
long remotePos, long remoteImpliedPos, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
373+
this.remotePos = remotePos;
374+
this.remoteImpliedPos = remoteImpliedPos;
364375
this.resumeFrameSent = resumeFrameSent;
365376
}
366377

367378
@Override
368379
public void run() {
369-
doResume(peerResumptionState, resumeFrameSent);
380+
doResume(remotePos, remoteImpliedPos, resumeFrameSent);
370381
}
371382
}
372383

rsocket-core/src/main/java/io/rsocket/resume/ResumeStateException.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,32 @@
1818

1919
class ResumeStateException extends RuntimeException {
2020
private static final long serialVersionUID = -5393753463377588732L;
21-
private final ResumptionState local;
22-
private final ResumptionState remote;
21+
private final long localPos;
22+
private final long localImpliedPos;
23+
private final long remotePos;
24+
private final long remoteImpliedPos;
2325

24-
public ResumeStateException(ResumptionState local, ResumptionState remote) {
25-
this.local = local;
26-
this.remote = remote;
26+
public ResumeStateException(
27+
long localPos, long localImpliedPos, long remotePos, long remoteImpliedPos) {
28+
this.localPos = localPos;
29+
this.localImpliedPos = localImpliedPos;
30+
this.remotePos = remotePos;
31+
this.remoteImpliedPos = remoteImpliedPos;
2732
}
2833

29-
public ResumptionState localState() {
30-
return local;
34+
public long getLocalPos() {
35+
return localPos;
3136
}
3237

33-
public ResumptionState remoteState() {
34-
return remote;
38+
public long getLocalImpliedPos() {
39+
return localImpliedPos;
40+
}
41+
42+
public long getRemotePos() {
43+
return remotePos;
44+
}
45+
46+
public long getRemoteImpliedPos() {
47+
return remoteImpliedPos;
3548
}
3649
}

rsocket-core/src/main/java/io/rsocket/resume/ResumedFramesCalculator.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,39 @@ interface ResumedFramesCalculator {
2323
ResumedFramesCalculator ofClient = new ClientResumedFramesCalculator();
2424
ResumedFramesCalculator ofServer = new ServerResumedFramesCalculator();
2525

26-
Mono<Long> calculate(ResumptionState local, ResumptionState remote);
26+
Mono<Long> calculate(long localPos, long localImpliedPos, long remotePos, long remoteImpliedPos);
2727

2828
class ClientResumedFramesCalculator implements ResumedFramesCalculator {
2929

30+
/*ResumptionState clientState, ResumptionState serverState*/
3031
@Override
31-
public Mono<Long> calculate(ResumptionState clientState, ResumptionState serverState) {
32-
long serverImplied = serverState.impliedPosition();
33-
if (serverImplied >= clientState.position()) {
34-
return Mono.just(serverImplied);
32+
public Mono<Long> calculate(
33+
long clientPos, long clientImpliedPos, long serverPos, long serverImpliedPos) {
34+
if (serverImpliedPos >= clientPos) {
35+
return Mono.just(serverImpliedPos);
3536
} else {
36-
return Mono.error(new ResumeStateException(clientState, serverState));
37+
return Mono.error(
38+
new ResumeStateException(
39+
clientPos, clientImpliedPos,
40+
serverPos, serverImpliedPos));
3741
}
3842
}
3943
}
4044

4145
class ServerResumedFramesCalculator implements ResumedFramesCalculator {
4246

47+
/*ResumptionState serverState, ResumptionState clientState*/
4348
@Override
44-
public Mono<Long> calculate(ResumptionState serverState, ResumptionState clientState) {
45-
boolean clientStateValid = clientState.position() <= serverState.impliedPosition();
46-
boolean serverStateValid = serverState.position() <= clientState.impliedPosition();
49+
public Mono<Long> calculate(
50+
long serverPos, long serverImpliedPos, long clientPos, long clientImpliedPos) {
51+
boolean clientStateValid = clientPos <= serverImpliedPos;
52+
boolean serverStateValid = serverPos <= clientImpliedPos;
4753
return clientStateValid && serverStateValid
48-
? Mono.just(clientState.impliedPosition())
49-
: Mono.error(new ResumeStateException(serverState, clientState));
54+
? Mono.just(clientImpliedPos)
55+
: Mono.error(
56+
new ResumeStateException(
57+
serverPos, serverImpliedPos,
58+
clientPos, clientImpliedPos));
5059
}
5160
}
5261
}

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,13 @@ public ServerRSocketSession continueWith(ResumeAwareConnection newConnection) {
9696
@Override
9797
public ServerRSocketSession resumeWith(ByteBuf resumeFrame) {
9898
logger.debug("Resume FRAME received");
99+
long remotePos = remotePos(resumeFrame);
100+
long remoteImpliedPos = remoteImpliedPos(resumeFrame);
101+
resumeFrame.release();
102+
99103
resumableConnection.resume(
100-
stateFromFrame(resumeFrame),
104+
remotePos,
105+
remoteImpliedPos,
101106
pos ->
102107
pos.flatMap(
103108
impliedPos -> sendFrame(ResumeOkFrameFlyweight.encode(allocator, impliedPos)))
@@ -135,11 +140,12 @@ private Mono<Void> sendFrame(ByteBuf frame) {
135140
return resumableConnection.sendOne(frame).onErrorResume(e -> Mono.empty());
136141
}
137142

138-
private static ResumptionState stateFromFrame(ByteBuf resumeFrame) {
139-
long peerPos = ResumeFrameFlyweight.firstAvailableClientPos(resumeFrame);
140-
long peerImpliedPos = ResumeFrameFlyweight.lastReceivedServerPos(resumeFrame);
141-
resumeFrame.release();
142-
return ResumptionState.fromClient(peerPos, peerImpliedPos);
143+
private static long remotePos(ByteBuf resumeFrame) {
144+
return ResumeFrameFlyweight.firstAvailableClientPos(resumeFrame);
145+
}
146+
147+
private static long remoteImpliedPos(ByteBuf resumeFrame) {
148+
return ResumeFrameFlyweight.lastReceivedServerPos(resumeFrame);
143149
}
144150

145151
private static RejectedResumeException errorFrameThrowable(Throwable err) {
@@ -148,8 +154,11 @@ private static RejectedResumeException errorFrameThrowable(Throwable err) {
148154
ResumeStateException resumeException = ((ResumeStateException) err);
149155
msg =
150156
String.format(
151-
"resumption_pos=[ remote: %s, local: %s]",
152-
resumeException.remoteState(), resumeException.localState());
157+
"resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]",
158+
resumeException.getRemotePos(),
159+
resumeException.getRemoteImpliedPos(),
160+
resumeException.getLocalPos(),
161+
resumeException.getLocalImpliedPos());
153162
} else {
154163
msg = String.format("resume_internal_error: %s", err.getMessage());
155164
}

rsocket-core/src/test/java/io/rsocket/resume/ResumeCalculatorTest.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,47 +34,37 @@ void setUp() {
3434

3535
@Test
3636
void clientResumeSuccess() {
37-
ResumptionState local = ResumptionState.fromClient(1, 42);
38-
ResumptionState remote = ResumptionState.fromServer(3);
39-
StepVerifier.create(clientResumeCalculator.calculate(local, remote))
37+
StepVerifier.create(clientResumeCalculator.calculate(1, 42, -1, 3))
4038
.expectNext(3L)
4139
.expectComplete()
4240
.verify(Duration.ofSeconds(1));
4341
}
4442

4543
@Test
4644
void clientResumeError() {
47-
ResumptionState local = ResumptionState.fromClient(4, 42);
48-
ResumptionState remote = ResumptionState.fromServer(3);
49-
StepVerifier.create(clientResumeCalculator.calculate(local, remote))
45+
StepVerifier.create(clientResumeCalculator.calculate(4, 42, -1, 3))
5046
.expectError(ResumeStateException.class)
5147
.verify(Duration.ofSeconds(1));
5248
}
5349

5450
@Test
5551
void serverResumeSuccess() {
56-
ResumptionState local = ResumptionState.fromClient(1, 42);
57-
ResumptionState remote = ResumptionState.fromClient(4, 23);
58-
StepVerifier.create(serverResumeCalculator.calculate(local, remote))
52+
StepVerifier.create(serverResumeCalculator.calculate(1, 42, 4, 23))
5953
.expectNext(23L)
6054
.expectComplete()
6155
.verify(Duration.ofSeconds(1));
6256
}
6357

6458
@Test
6559
void serverResumeErrorClientState() {
66-
ResumptionState local = ResumptionState.fromClient(1, 3);
67-
ResumptionState remote = ResumptionState.fromClient(4, 23);
68-
StepVerifier.create(serverResumeCalculator.calculate(local, remote))
60+
StepVerifier.create(serverResumeCalculator.calculate(1, 3, 4, 23))
6961
.expectError(ResumeStateException.class)
7062
.verify(Duration.ofSeconds(1));
7163
}
7264

7365
@Test
7466
void serverResumeErrorServerState() {
75-
ResumptionState local = ResumptionState.fromClient(4, 42);
76-
ResumptionState remote = ResumptionState.fromClient(4, 1);
77-
StepVerifier.create(serverResumeCalculator.calculate(local, remote))
67+
StepVerifier.create(serverResumeCalculator.calculate(4, 42, 4, 1))
7868
.expectError(ResumeStateException.class)
7969
.verify(Duration.ofSeconds(1));
8070
}

0 commit comments

Comments
 (0)