Skip to content

Commit 0da0d80

Browse files
committed
tests
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 5c71082 commit 0da0d80

File tree

2 files changed

+100
-5
lines changed

2 files changed

+100
-5
lines changed

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ public class InMemoryResumableFramesStore implements ResumableFramesStore {
3232
private static final long SAVE_REQUEST_SIZE = Long.MAX_VALUE;
3333

3434
private final MonoProcessor<Void> disposed = MonoProcessor.create();
35-
private volatile long position;
36-
private volatile long impliedPosition;
37-
private volatile int cacheSize;
38-
private final Queue<ByteBuf> cachedFrames;
35+
volatile long position;
36+
volatile long impliedPosition;
37+
volatile int cacheSize;
38+
final Queue<ByteBuf> cachedFrames;
3939
private final String tag;
4040
private final int cacheLimit;
4141
private volatile int upstreamFrameRefCnt;
@@ -154,7 +154,7 @@ private int releaseTailFrame(ByteBuf content) {
154154

155155
/*this method and releaseTailFrame() won't be called concurrently,
156156
* so non-atomic on volatile is safe*/
157-
private void saveFrame(ByteBuf frame) {
157+
void saveFrame(ByteBuf frame) {
158158
if (upstreamFrameRefCnt == 0) {
159159
upstreamFrameRefCnt = frame.refCnt();
160160
}
@@ -172,6 +172,8 @@ private void saveFrame(ByteBuf frame) {
172172
if (availableSize >= frameSize) {
173173
cachedFrames.offer(frame.retain());
174174
cacheSize += frameSize;
175+
} else {
176+
position += frameSize;
175177
}
176178
}
177179

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.rsocket.resume;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import org.junit.Assert;
6+
import org.junit.jupiter.api.Test;
7+
import reactor.core.publisher.Flux;
8+
9+
import java.util.Arrays;
10+
11+
public class InMemoryResumeStoreTest {
12+
13+
@Test
14+
void saveWithoutTailRemoval() {
15+
InMemoryResumableFramesStore store = inMemoryStore(25);
16+
ByteBuf frame = frameMock(10);
17+
store.saveFrames(Flux.just(frame)).block();
18+
Assert.assertEquals(1, store.cachedFrames.size());
19+
Assert.assertEquals(frame.readableBytes(), store.cacheSize);
20+
Assert.assertEquals(0, store.position);
21+
}
22+
23+
@Test
24+
void saveRemoveOneFromTail() {
25+
InMemoryResumableFramesStore store = inMemoryStore(25);
26+
ByteBuf frame1 = frameMock(20);
27+
ByteBuf frame2 = frameMock(10);
28+
store.saveFrames(Flux.just(frame1, frame2)).block();
29+
Assert.assertEquals(1, store.cachedFrames.size());
30+
Assert.assertEquals(frame2.readableBytes(), store.cacheSize);
31+
Assert.assertEquals(frame1.readableBytes(), store.position);
32+
}
33+
34+
@Test
35+
void saveRemoveTwoFromTail() {
36+
InMemoryResumableFramesStore store = inMemoryStore(25);
37+
ByteBuf frame1 = frameMock(10);
38+
ByteBuf frame2 = frameMock(10);
39+
ByteBuf frame3 = frameMock(20);
40+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
41+
Assert.assertEquals(1, store.cachedFrames.size());
42+
Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
43+
Assert.assertEquals(size(frame1, frame2), store.position);
44+
}
45+
46+
@Test
47+
void saveBiggerThanStore() {
48+
InMemoryResumableFramesStore store = inMemoryStore(25);
49+
ByteBuf frame1 = frameMock(10);
50+
ByteBuf frame2 = frameMock(10);
51+
ByteBuf frame3 = frameMock(30);
52+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
53+
Assert.assertEquals(0, store.cachedFrames.size());
54+
Assert.assertEquals(0, store.cacheSize);
55+
Assert.assertEquals(size(frame1, frame2, frame3), store.position);
56+
}
57+
58+
@Test
59+
void releaseFrames() {
60+
InMemoryResumableFramesStore store = inMemoryStore(100);
61+
ByteBuf frame1 = frameMock(10);
62+
ByteBuf frame2 = frameMock(10);
63+
ByteBuf frame3 = frameMock(30);
64+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
65+
store.releaseFrames(20);
66+
Assert.assertEquals(1, store.cachedFrames.size());
67+
Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
68+
Assert.assertEquals(size(frame1, frame2), store.position);
69+
}
70+
71+
@Test
72+
void receiveImpliedPosition() {
73+
InMemoryResumableFramesStore store = inMemoryStore(100);
74+
ByteBuf frame1 = frameMock(10);
75+
ByteBuf frame2 = frameMock(30);
76+
store.resumableFrameReceived(frame1);
77+
store.resumableFrameReceived(frame2);
78+
Assert.assertEquals(size(frame1, frame2), store.frameImpliedPosition());
79+
}
80+
81+
private int size(ByteBuf... byteBufs) {
82+
return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum();
83+
}
84+
private static InMemoryResumableFramesStore inMemoryStore(int size) {
85+
return new InMemoryResumableFramesStore("test", size);
86+
}
87+
88+
private static ByteBuf frameMock(int size) {
89+
byte[] bytes = new byte[size];
90+
Arrays.fill(bytes,(byte)7);
91+
return Unpooled.wrappedBuffer(bytes);
92+
}
93+
}

0 commit comments

Comments
 (0)