Skip to content

Commit 627f590

Browse files
committed
ensures InMemoryResumableFramesStore does not retain not resumable frames (#1009)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2816a79 commit 627f590

File tree

2 files changed

+149
-105
lines changed

2 files changed

+149
-105
lines changed

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,12 @@ public void onComplete() {
234234
public void onNext(ByteBuf frame) {
235235
final int state;
236236
final boolean isResumable = isResumableFrame(frame);
237+
boolean canBeStore = isResumable;
237238
if (isResumable) {
238239
final ArrayList<ByteBuf> frames = cachedFrames;
239-
int incomingFrameSize = frame.readableBytes();
240+
final int incomingFrameSize = frame.readableBytes();
240241
final int cacheLimit = this.cacheLimit;
242+
241243
if (cacheLimit != Integer.MAX_VALUE) {
242244
long availableSize = cacheLimit - cacheSize;
243245
if (availableSize < incomingFrameSize) {
@@ -256,26 +258,36 @@ public void onNext(ByteBuf frame) {
256258
}
257259
}
258260
CACHE_SIZE.addAndGet(this, -removedBytes);
259-
POSITION.addAndGet(this, removedBytes);
261+
262+
canBeStore = availableSize >= incomingFrameSize;
263+
POSITION.addAndGet(this, removedBytes + (canBeStore ? 0 : incomingFrameSize));
264+
} else {
265+
canBeStore = true;
260266
}
267+
} else {
268+
canBeStore = true;
261269
}
262-
synchronized (this) {
263-
state = this.state;
264-
if (state != 2) {
265-
frames.add(frame);
270+
271+
state = this.state;
272+
if (canBeStore) {
273+
synchronized (this) {
274+
if (state != 2) {
275+
frames.add(frame);
276+
}
277+
}
278+
279+
if (cacheLimit != Integer.MAX_VALUE) {
280+
CACHE_SIZE.addAndGet(this, incomingFrameSize);
266281
}
267-
}
268-
if (cacheLimit != Integer.MAX_VALUE) {
269-
CACHE_SIZE.addAndGet(this, incomingFrameSize);
270282
}
271283
} else {
272284
state = this.state;
273285
}
274286

275287
final CoreSubscriber<? super ByteBuf> actual = this.actual;
276288
if (state == 1) {
277-
actual.onNext(frame.retain());
278-
} else if (!isResumable || state == 2) {
289+
actual.onNext(isResumable && canBeStore ? frame.retainedSlice() : frame);
290+
} else if (!isResumable || !canBeStore || state == 2) {
279291
frame.release();
280292
}
281293
}
@@ -302,7 +314,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
302314
actual.onSubscribe(this);
303315
synchronized (this) {
304316
for (final ByteBuf frame : cachedFrames) {
305-
actual.onNext(frame.retain());
317+
actual.onNext(frame.retainedSlice());
306318
}
307319
}
308320

Lines changed: 125 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,125 @@
1-
// package io.rsocket.resume;
2-
//
3-
// import io.netty.buffer.ByteBuf;
4-
// import io.netty.buffer.Unpooled;
5-
// import java.util.Arrays;
6-
// import org.junit.Assert;
7-
// import org.junit.jupiter.api.Test;
8-
// import reactor.core.publisher.Flux;
9-
//
10-
// public class InMemoryResumeStoreTest {
11-
//
12-
// @Test
13-
// void saveWithoutTailRemoval() {
14-
// InMemoryResumableFramesStore store = inMemoryStore(25);
15-
// ByteBuf frame = frameMock(10);
16-
// store.saveFrames(Flux.just(frame)).block();
17-
// Assert.assertEquals(1, store.cachedFrames.size());
18-
// Assert.assertEquals(frame.readableBytes(), store.cacheSize);
19-
// Assert.assertEquals(0, store.position);
20-
// }
21-
//
22-
// @Test
23-
// void saveRemoveOneFromTail() {
24-
// InMemoryResumableFramesStore store = inMemoryStore(25);
25-
// ByteBuf frame1 = frameMock(20);
26-
// ByteBuf frame2 = frameMock(10);
27-
// store.saveFrames(Flux.just(frame1, frame2)).block();
28-
// Assert.assertEquals(1, store.cachedFrames.size());
29-
// Assert.assertEquals(frame2.readableBytes(), store.cacheSize);
30-
// Assert.assertEquals(frame1.readableBytes(), store.position);
31-
// }
32-
//
33-
// @Test
34-
// void saveRemoveTwoFromTail() {
35-
// InMemoryResumableFramesStore store = inMemoryStore(25);
36-
// ByteBuf frame1 = frameMock(10);
37-
// ByteBuf frame2 = frameMock(10);
38-
// ByteBuf frame3 = frameMock(20);
39-
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
40-
// Assert.assertEquals(1, store.cachedFrames.size());
41-
// Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
42-
// Assert.assertEquals(size(frame1, frame2), store.position);
43-
// }
44-
//
45-
// @Test
46-
// void saveBiggerThanStore() {
47-
// InMemoryResumableFramesStore store = inMemoryStore(25);
48-
// ByteBuf frame1 = frameMock(10);
49-
// ByteBuf frame2 = frameMock(10);
50-
// ByteBuf frame3 = frameMock(30);
51-
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
52-
// Assert.assertEquals(0, store.cachedFrames.size());
53-
// Assert.assertEquals(0, store.cacheSize);
54-
// Assert.assertEquals(size(frame1, frame2, frame3), store.position);
55-
// }
56-
//
57-
// @Test
58-
// void releaseFrames() {
59-
// InMemoryResumableFramesStore store = inMemoryStore(100);
60-
// ByteBuf frame1 = frameMock(10);
61-
// ByteBuf frame2 = frameMock(10);
62-
// ByteBuf frame3 = frameMock(30);
63-
// store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
64-
// store.releaseFrames(20);
65-
// Assert.assertEquals(1, store.cachedFrames.size());
66-
// Assert.assertEquals(frame3.readableBytes(), store.cacheSize);
67-
// Assert.assertEquals(size(frame1, frame2), store.position);
68-
// }
69-
//
70-
// @Test
71-
// void receiveImpliedPosition() {
72-
// InMemoryResumableFramesStore store = inMemoryStore(100);
73-
// ByteBuf frame1 = frameMock(10);
74-
// ByteBuf frame2 = frameMock(30);
75-
// store.resumableFrameReceived(frame1);
76-
// store.resumableFrameReceived(frame2);
77-
// Assert.assertEquals(size(frame1, frame2), store.frameImpliedPosition());
78-
// }
79-
//
80-
// private int size(ByteBuf... byteBufs) {
81-
// return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum();
82-
// }
83-
//
84-
// private static InMemoryResumableFramesStore inMemoryStore(int size) {
85-
// return new InMemoryResumableFramesStore("test", size);
86-
// }
87-
//
88-
// private static ByteBuf frameMock(int size) {
89-
// byte[] bytes = new byte[size];
90-
// Arrays.fill(bytes, (byte) 7);
91-
// return Unpooled.wrappedBuffer(bytes);
92-
// }
93-
// }
1+
package io.rsocket.resume;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.buffer.Unpooled;
7+
import java.util.Arrays;
8+
import org.junit.jupiter.api.Test;
9+
import reactor.core.publisher.Flux;
10+
11+
public class InMemoryResumeStoreTest {
12+
13+
@Test
14+
void saveNonResumableFrame() {
15+
InMemoryResumableFramesStore store = inMemoryStore(25);
16+
ByteBuf frame1 = fakeConnectionFrame(10);
17+
ByteBuf frame2 = fakeConnectionFrame(35);
18+
store.saveFrames(Flux.just(frame1, frame2)).block();
19+
assertThat(store.cachedFrames.size()).isZero();
20+
assertThat(store.cacheSize).isZero();
21+
assertThat(store.position).isZero();
22+
assertThat(frame1.refCnt()).isZero();
23+
assertThat(frame2.refCnt()).isZero();
24+
}
25+
26+
@Test
27+
void saveWithoutTailRemoval() {
28+
InMemoryResumableFramesStore store = inMemoryStore(25);
29+
ByteBuf frame = fakeResumableFrame(10);
30+
store.saveFrames(Flux.just(frame)).block();
31+
assertThat(store.cachedFrames.size()).isEqualTo(1);
32+
assertThat(store.cacheSize).isEqualTo(frame.readableBytes());
33+
assertThat(store.position).isZero();
34+
assertThat(frame.refCnt()).isOne();
35+
}
36+
37+
@Test
38+
void saveRemoveOneFromTail() {
39+
InMemoryResumableFramesStore store = inMemoryStore(25);
40+
ByteBuf frame1 = fakeResumableFrame(20);
41+
ByteBuf frame2 = fakeResumableFrame(10);
42+
store.saveFrames(Flux.just(frame1, frame2)).block();
43+
assertThat(store.cachedFrames.size()).isOne();
44+
assertThat(store.cacheSize).isEqualTo(frame2.readableBytes());
45+
assertThat(store.position).isEqualTo(frame1.readableBytes());
46+
assertThat(frame1.refCnt()).isZero();
47+
assertThat(frame2.refCnt()).isOne();
48+
}
49+
50+
@Test
51+
void saveRemoveTwoFromTail() {
52+
InMemoryResumableFramesStore store = inMemoryStore(25);
53+
ByteBuf frame1 = fakeResumableFrame(10);
54+
ByteBuf frame2 = fakeResumableFrame(10);
55+
ByteBuf frame3 = fakeResumableFrame(20);
56+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
57+
assertThat(store.cachedFrames.size()).isOne();
58+
assertThat(store.cacheSize).isEqualTo(frame3.readableBytes());
59+
assertThat(store.position).isEqualTo(size(frame1, frame2));
60+
assertThat(frame1.refCnt()).isZero();
61+
assertThat(frame2.refCnt()).isZero();
62+
assertThat(frame3.refCnt()).isOne();
63+
}
64+
65+
@Test
66+
void saveBiggerThanStore() {
67+
InMemoryResumableFramesStore store = inMemoryStore(25);
68+
ByteBuf frame1 = fakeResumableFrame(10);
69+
ByteBuf frame2 = fakeResumableFrame(10);
70+
ByteBuf frame3 = fakeResumableFrame(30);
71+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
72+
assertThat(store.cachedFrames.size()).isZero();
73+
assertThat(store.cacheSize).isZero();
74+
assertThat(store.position).isEqualTo(size(frame1, frame2, frame3));
75+
assertThat(frame1.refCnt()).isZero();
76+
assertThat(frame2.refCnt()).isZero();
77+
assertThat(frame3.refCnt()).isZero();
78+
}
79+
80+
@Test
81+
void releaseFrames() {
82+
InMemoryResumableFramesStore store = inMemoryStore(100);
83+
ByteBuf frame1 = fakeResumableFrame(10);
84+
ByteBuf frame2 = fakeResumableFrame(10);
85+
ByteBuf frame3 = fakeResumableFrame(30);
86+
store.saveFrames(Flux.just(frame1, frame2, frame3)).block();
87+
store.releaseFrames(20);
88+
assertThat(store.cachedFrames.size()).isOne();
89+
assertThat(store.cacheSize).isEqualTo(frame3.readableBytes());
90+
assertThat(store.position).isEqualTo(size(frame1, frame2));
91+
assertThat(frame1.refCnt()).isZero();
92+
assertThat(frame2.refCnt()).isZero();
93+
assertThat(frame3.refCnt()).isOne();
94+
}
95+
96+
@Test
97+
void receiveImpliedPosition() {
98+
InMemoryResumableFramesStore store = inMemoryStore(100);
99+
ByteBuf frame1 = fakeResumableFrame(10);
100+
ByteBuf frame2 = fakeResumableFrame(30);
101+
store.resumableFrameReceived(frame1);
102+
store.resumableFrameReceived(frame2);
103+
assertThat(store.frameImpliedPosition()).isEqualTo(size(frame1, frame2));
104+
}
105+
106+
private int size(ByteBuf... byteBufs) {
107+
return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum();
108+
}
109+
110+
private static InMemoryResumableFramesStore inMemoryStore(int size) {
111+
return new InMemoryResumableFramesStore("test", size);
112+
}
113+
114+
private static ByteBuf fakeResumableFrame(int size) {
115+
byte[] bytes = new byte[size];
116+
Arrays.fill(bytes, (byte) 7);
117+
return Unpooled.wrappedBuffer(bytes);
118+
}
119+
120+
private static ByteBuf fakeConnectionFrame(int size) {
121+
byte[] bytes = new byte[size];
122+
Arrays.fill(bytes, (byte) 0);
123+
return Unpooled.wrappedBuffer(bytes);
124+
}
125+
}

0 commit comments

Comments
 (0)