Skip to content

Commit 4a627d0

Browse files
committed
remove ResumedFramesCalculator
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 8a15df0 commit 4a627d0

File tree

5 files changed

+39
-106
lines changed

5 files changed

+39
-106
lines changed

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,7 @@ public ClientRSocketSession(
4545
this.allocator = Objects.requireNonNull(allocator);
4646
this.resumableConnection =
4747
new ResumableDuplexConnection(
48-
"client",
49-
duplexConnection,
50-
ResumedFramesCalculator.ofClient,
51-
config.resumeStore(),
52-
config.resumeStreamTimeout());
48+
"client", duplexConnection, config.resumeStore(), config.resumeStreamTimeout());
5349

5450
resumableConnection
5551
.connectionErrors()

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
3535
private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
3636

3737
private final String tag;
38-
private final ResumedFramesCalculator resumedFramesCalculator;
3938
private final ResumableFramesStore resumableFramesStore;
4039
private final Duration resumeStreamTimeout;
4140

@@ -66,11 +65,9 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
6665
ResumableDuplexConnection(
6766
String tag,
6867
ResumeAwareConnection duplexConnection,
69-
ResumedFramesCalculator resumedFramesCalculator,
7068
ResumableFramesStore resumableFramesStore,
7169
Duration resumeStreamTimeout) {
7270
this.tag = tag;
73-
this.resumedFramesCalculator = resumedFramesCalculator;
7471
this.resumableFramesStore = resumableFramesStore;
7572
this.resumeStreamTimeout = resumeStreamTimeout;
7673

@@ -242,27 +239,34 @@ private void doResume(
242239
long localPosition = position();
243240
long localImpliedPosition = impliedPosition();
244241

245-
logger.debug(
246-
"Resumption start. Calculating implied pos using: {}",
247-
resumedFramesCalculator.getClass().getSimpleName());
242+
logger.debug("Resumption start");
248243
logger.debug(
249244
"Resumption states. local: [pos: {}, impliedPos: {}], remote: [pos: {}, impliedPos: {}]",
250245
localPosition,
251246
localImpliedPosition,
252247
remotePosition,
253248
remoteImpliedPosition);
254249

255-
Mono<Long> res =
256-
resumedFramesCalculator.calculate(
250+
long remoteImpliedPos =
251+
calculateRemoteImpliedPos(
257252
localPosition, localImpliedPosition,
258253
remotePosition, remoteImpliedPosition);
259-
Mono<Long> localImpliedPos =
260-
res.doOnSuccess(notUsed -> state = State.RESUME)
261-
.doOnSuccess(this::releaseFramesToPosition)
262-
.map(remoteImpliedPos -> localImpliedPosition);
254+
255+
Mono<Long> impliedPositionOrError;
256+
if (remoteImpliedPos >= 0) {
257+
state = State.RESUME;
258+
releaseFramesToPosition(remoteImpliedPos);
259+
impliedPositionOrError = Mono.just(localImpliedPosition);
260+
} else {
261+
impliedPositionOrError =
262+
Mono.error(
263+
new ResumeStateException(
264+
localPosition, localImpliedPosition,
265+
remotePosition, remoteImpliedPos));
266+
}
263267

264268
sendResumeFrame
265-
.apply(localImpliedPos)
269+
.apply(impliedPositionOrError)
266270
.then(
267271
streamResumedFrames(
268272
resumableFramesStore
@@ -274,6 +278,15 @@ private void doResume(
274278
.subscribe();
275279
}
276280

281+
static long calculateRemoteImpliedPos(
282+
long pos, long impliedPos, long remotePos, long remoteImpliedPos) {
283+
if (remotePos <= impliedPos && pos <= remoteImpliedPos) {
284+
return remoteImpliedPos;
285+
} else {
286+
return -1L;
287+
}
288+
}
289+
277290
private void doResumeComplete() {
278291
logger.debug("Completing resumption");
279292
state = State.RESUME_COMPLETED;

rsocket-core/src/main/java/io/rsocket/resume/ResumedFramesCalculator.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public ServerRSocketSession(
5555
new ResumableDuplexConnection(
5656
"server",
5757
duplexConnection,
58-
ResumedFramesCalculator.ofServer,
5958
config.resumeStoreFactory().apply(resumeToken),
6059
config.resumeStreamTimeout());
6160

rsocket-core/src/test/java/io/rsocket/resume/ResumeCalculatorTest.java

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,56 +16,42 @@
1616

1717
package io.rsocket.resume;
1818

19-
import java.time.Duration;
19+
import org.junit.jupiter.api.Assertions;
2020
import org.junit.jupiter.api.BeforeEach;
2121
import org.junit.jupiter.api.Test;
22-
import reactor.test.StepVerifier;
2322

2423
public class ResumeCalculatorTest {
2524

26-
private ResumedFramesCalculator clientResumeCalculator;
27-
private ResumedFramesCalculator serverResumeCalculator;
28-
2925
@BeforeEach
30-
void setUp() {
31-
clientResumeCalculator = ResumedFramesCalculator.ofClient;
32-
serverResumeCalculator = ResumedFramesCalculator.ofServer;
33-
}
26+
void setUp() {}
3427

3528
@Test
3629
void clientResumeSuccess() {
37-
StepVerifier.create(clientResumeCalculator.calculate(1, 42, -1, 3))
38-
.expectNext(3L)
39-
.expectComplete()
40-
.verify(Duration.ofSeconds(1));
30+
long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 42, -1, 3);
31+
Assertions.assertEquals(3, position);
4132
}
4233

4334
@Test
4435
void clientResumeError() {
45-
StepVerifier.create(clientResumeCalculator.calculate(4, 42, -1, 3))
46-
.expectError(ResumeStateException.class)
47-
.verify(Duration.ofSeconds(1));
36+
long position = ResumableDuplexConnection.calculateRemoteImpliedPos(4, 42, -1, 3);
37+
Assertions.assertEquals(-1, position);
4838
}
4939

5040
@Test
5141
void serverResumeSuccess() {
52-
StepVerifier.create(serverResumeCalculator.calculate(1, 42, 4, 23))
53-
.expectNext(23L)
54-
.expectComplete()
55-
.verify(Duration.ofSeconds(1));
42+
long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 42, 4, 23);
43+
Assertions.assertEquals(23, position);
5644
}
5745

5846
@Test
5947
void serverResumeErrorClientState() {
60-
StepVerifier.create(serverResumeCalculator.calculate(1, 3, 4, 23))
61-
.expectError(ResumeStateException.class)
62-
.verify(Duration.ofSeconds(1));
48+
long position = ResumableDuplexConnection.calculateRemoteImpliedPos(1, 3, 4, 23);
49+
Assertions.assertEquals(-1, position);
6350
}
6451

6552
@Test
6653
void serverResumeErrorServerState() {
67-
StepVerifier.create(serverResumeCalculator.calculate(4, 42, 4, 1))
68-
.expectError(ResumeStateException.class)
69-
.verify(Duration.ofSeconds(1));
54+
long position = ResumableDuplexConnection.calculateRemoteImpliedPos(4, 42, 4, 1);
55+
Assertions.assertEquals(-1, position);
7056
}
7157
}

0 commit comments

Comments
 (0)