Skip to content

Commit 03e0e4b

Browse files
committed
Allow Frame to be recycled when content is retained
1 parent 1a4844f commit 03e0e4b

File tree

5 files changed

+48
-72
lines changed

5 files changed

+48
-72
lines changed

rsocket-core/src/main/java/io/rsocket/Frame.java

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
2020

2121
import io.netty.buffer.*;
22+
import io.netty.util.AbstractReferenceCounted;
2223
import io.netty.util.IllegalReferenceCountException;
2324
import io.netty.util.Recycler;
2425
import io.netty.util.Recycler.Handle;
@@ -33,7 +34,6 @@
3334
import io.rsocket.frame.VersionFlyweight;
3435
import io.rsocket.framing.FrameType;
3536
import java.nio.charset.StandardCharsets;
36-
import java.util.Objects;
3737
import javax.annotation.Nullable;
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@
4343
*
4444
* <p>This provides encoding, decoding and field accessors.
4545
*/
46-
public class Frame implements Payload, ByteBufHolder {
46+
public class Frame extends AbstractReferenceCounted implements Payload, ByteBufHolder {
4747
private static final Recycler<Frame> RECYCLER =
4848
new Recycler<Frame>() {
4949
protected Frame newObject(Handle<Frame> handle) {
@@ -58,12 +58,6 @@ private Frame(final Handle<Frame> handle) {
5858
this.handle = handle;
5959
}
6060

61-
/** Clear and recycle this instance. */
62-
private void recycle() {
63-
content = null;
64-
handle.recycle(this);
65-
}
66-
6761
/** Return the content which is held by this {@link Frame}. */
6862
@Override
6963
public ByteBuf content() {
@@ -105,26 +99,17 @@ public Frame replace(ByteBuf content) {
10599
return from(content);
106100
}
107101

108-
/**
109-
* Returns the reference count of this object. If {@code 0}, it means this object has been
110-
* deallocated.
111-
*/
112-
@Override
113-
public int refCnt() {
114-
return content.refCnt();
115-
}
116-
117102
/** Increases the reference count by {@code 1}. */
118103
@Override
119104
public Frame retain() {
120-
content.retain();
105+
super.retain();
121106
return this;
122107
}
123108

124109
/** Increases the reference count by the specified {@code increment}. */
125110
@Override
126111
public Frame retain(int increment) {
127-
content.retain(increment);
112+
super.retain(increment);
128113
return this;
129114
}
130115

@@ -151,35 +136,13 @@ public Frame touch(@Nullable Object hint) {
151136
}
152137

153138
/**
154-
* Decreases the reference count by {@code 1} and deallocates this object if the reference count
155-
* reaches at {@code 0}.
156-
*
157-
* @return {@code true} if and only if the reference count became {@code 0} and this object has
158-
* been deallocated
159-
*/
160-
@Override
161-
public boolean release() {
162-
if (content != null && content.release()) {
163-
recycle();
164-
return true;
165-
}
166-
return false;
167-
}
168-
169-
/**
170-
* Decreases the reference count by the specified {@code decrement} and deallocates this object if
171-
* the reference count reaches at {@code 0}.
172-
*
173-
* @return {@code true} if and only if the reference count became {@code 0} and this object has
174-
* been deallocated
139+
* Called once {@link #refCnt()} is equals 0.
175140
*/
176141
@Override
177-
public boolean release(int decrement) {
178-
if (content != null && content.release(decrement)) {
179-
recycle();
180-
return true;
181-
}
182-
return false;
142+
protected void deallocate() {
143+
content.release();
144+
content = null;
145+
handle.recycle(this);
183146
}
184147

185148
/**
@@ -239,6 +202,7 @@ public int flags() {
239202
*/
240203
public static Frame from(final ByteBuf content) {
241204
final Frame frame = RECYCLER.get();
205+
frame.setRefCnt(1);
242206
frame.content = content;
243207

244208
return frame;
@@ -281,6 +245,7 @@ public static Frame from(
281245
final ByteBuf data = payload.sliceData();
282246

283247
final Frame frame = RECYCLER.get();
248+
frame.setRefCnt(1);
284249
frame.content =
285250
ByteBufAllocator.DEFAULT.buffer(
286251
SetupFrameFlyweight.computeFrameLength(
@@ -347,6 +312,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu
347312

348313
final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
349314
final Frame frame = RECYCLER.get();
315+
frame.setRefCnt(1);
350316
frame.content =
351317
ByteBufAllocator.DEFAULT.buffer(
352318
ErrorFrameFlyweight.computeFrameLength(dataBuffer.readableBytes()));
@@ -378,6 +344,7 @@ private Lease() {}
378344

379345
public static Frame from(int ttl, int numberOfRequests, ByteBuf metadata) {
380346
final Frame frame = RECYCLER.get();
347+
frame.setRefCnt(1);
381348
frame.content =
382349
ByteBufAllocator.DEFAULT.buffer(
383350
LeaseFrameFlyweight.computeFrameLength(metadata.readableBytes()));
@@ -411,6 +378,7 @@ public static Frame from(int streamId, int requestN) {
411378
}
412379

413380
final Frame frame = RECYCLER.get();
381+
frame.setRefCnt(1);
414382
frame.content = ByteBufAllocator.DEFAULT.buffer(RequestNFrameFlyweight.computeFrameLength());
415383
frame.content.writerIndex(RequestNFrameFlyweight.encode(frame.content, streamId, requestN));
416384
return frame;
@@ -438,6 +406,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
438406
final ByteBuf data = payload.sliceData();
439407

440408
final Frame frame = RECYCLER.get();
409+
frame.setRefCnt(1);
441410
frame.content =
442411
ByteBufAllocator.DEFAULT.buffer(
443412
RequestFrameFlyweight.computeFrameLength(
@@ -464,6 +433,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
464433

465434
public static Frame from(int streamId, FrameType type, int flags) {
466435
final Frame frame = RECYCLER.get();
436+
frame.setRefCnt(1);
467437
frame.content =
468438
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, null, 0));
469439
frame.content.writerIndex(
@@ -480,6 +450,7 @@ public static Frame from(
480450
int initialRequestN,
481451
int flags) {
482452
final Frame frame = RECYCLER.get();
453+
frame.setRefCnt(1);
483454
frame.content =
484455
ByteBufAllocator.DEFAULT.buffer(
485456
RequestFrameFlyweight.computeFrameLength(
@@ -543,6 +514,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int flag
543514
public static Frame from(
544515
int streamId, FrameType type, @Nullable ByteBuf metadata, ByteBuf data, int flags) {
545516
final Frame frame = RECYCLER.get();
517+
frame.setRefCnt(1);
546518
frame.content =
547519
ByteBufAllocator.DEFAULT.buffer(
548520
FrameHeaderFlyweight.computeFrameHeaderLength(
@@ -559,6 +531,7 @@ private Cancel() {}
559531

560532
public static Frame from(int streamId) {
561533
final Frame frame = RECYCLER.get();
534+
frame.setRefCnt(1);
562535
frame.content =
563536
ByteBufAllocator.DEFAULT.buffer(
564537
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, null, 0));
@@ -575,6 +548,7 @@ private Keepalive() {}
575548

576549
public static Frame from(ByteBuf data, boolean respond) {
577550
final Frame frame = RECYCLER.get();
551+
frame.setRefCnt(1);
578552
frame.content =
579553
ByteBufAllocator.DEFAULT.buffer(
580554
KeepaliveFrameFlyweight.computeFrameLength(data.readableBytes()));
@@ -611,12 +585,12 @@ public boolean equals(Object o) {
611585
return false;
612586
}
613587
final Frame frame = (Frame) o;
614-
return Objects.equals(content, frame.content);
588+
return content.equals(frame.content());
615589
}
616590

617591
@Override
618592
public int hashCode() {
619-
return Objects.hash(content);
593+
return content.hashCode();
620594
}
621595

622596
@Override

rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.buffer.Unpooled;
2323
import io.netty.util.AbstractReferenceCounted;
2424
import io.netty.util.Recycler;
25+
import io.netty.util.Recycler.Handle;
2526
import io.rsocket.Payload;
2627
import java.nio.ByteBuffer;
2728
import java.nio.CharBuffer;
@@ -36,11 +37,11 @@ protected ByteBufPayload newObject(Handle<ByteBufPayload> handle) {
3637
}
3738
};
3839

39-
private final Recycler.Handle<ByteBufPayload> handle;
40+
private final Handle<ByteBufPayload> handle;
4041
private ByteBuf data;
4142
private ByteBuf metadata;
4243

43-
private ByteBufPayload(final Recycler.Handle<ByteBufPayload> handle) {
44+
private ByteBufPayload(final Handle<ByteBufPayload> handle) {
4445
this.handle = handle;
4546
}
4647

@@ -168,12 +169,12 @@ public static Payload create(ByteBuf data) {
168169
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
169170
ByteBufPayload payload = RECYCLER.get();
170171
payload.setRefCnt(1);
171-
payload.data = data.retain();
172-
payload.metadata = metadata == null ? Unpooled.EMPTY_BUFFER : metadata.retain();
172+
payload.data = data;
173+
payload.metadata = metadata;
173174
return payload;
174175
}
175176

176177
public static Payload create(Payload payload) {
177-
return create(payload.sliceData(), payload.hasMetadata() ? payload.sliceMetadata() : null);
178+
return create(payload.sliceData().retain(), payload.hasMetadata() ? payload.sliceMetadata().retain() : null);
178179
}
179180
}

rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,15 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) {
154154
return new DefaultPayload(data, metadata);
155155
}
156156

157-
public static Payload create(Payload payload) {
158-
return create(
159-
copy(payload.sliceData()), payload.hasMetadata() ? copy(payload.sliceMetadata()) : null);
157+
public static Payload create(ByteBuf data) {
158+
return create(data, null);
160159
}
161160

162-
private static ByteBuffer copy(ByteBuf byteBuf) {
163-
byte[] contents = new byte[byteBuf.readableBytes()];
164-
byteBuf.readBytes(contents);
165-
return ByteBuffer.wrap(contents);
161+
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
162+
return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer());
163+
}
164+
165+
public static Payload create(Payload payload) {
166+
return create(Unpooled.copiedBuffer(payload.sliceData()), payload.hasMetadata() ? Unpooled.copiedBuffer(payload.sliceMetadata()) : null);
166167
}
167168
}

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ void reassembleNonFragment() {
184184
toAbstractionLeakingFrame(
185185
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, (ByteBuf) null, null));
186186

187-
when(delegate.receive()).thenReturn(Flux.just(frame));
187+
when(delegate.receive()).thenReturn(Flux.just(frame.retain()));
188188
when(delegate.onClose()).thenReturn(Mono.never());
189189

190190
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
@@ -199,7 +199,7 @@ void reassembleNonFragment() {
199199
void reassembleNonFragmentableFrame() {
200200
Frame frame = toAbstractionLeakingFrame(DEFAULT, 1, createTestCancelFrame());
201201

202-
when(delegate.receive()).thenReturn(Flux.just(frame));
202+
when(delegate.receive()).thenReturn(Flux.just(frame.retain()));
203203
when(delegate.onClose()).thenReturn(Mono.never());
204204

205205
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
@@ -232,7 +232,7 @@ void sendData() {
232232

233233
when(delegate.onClose()).thenReturn(Mono.never());
234234

235-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
235+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
236236
verify(delegate).send(publishers.capture());
237237

238238
StepVerifier.create(Flux.from(publishers.getValue()))
@@ -251,7 +251,7 @@ void sendEqualToMaxFragmentLength() {
251251

252252
when(delegate.onClose()).thenReturn(Mono.never());
253253

254-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
254+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
255255
verify(delegate).send(publishers.capture());
256256

257257
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
@@ -266,7 +266,7 @@ void sendFragment() {
266266

267267
when(delegate.onClose()).thenReturn(Mono.never());
268268

269-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
269+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
270270
verify(delegate).send(publishers.capture());
271271

272272
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
@@ -281,7 +281,7 @@ void sendLessThanMaxFragmentLength() {
281281

282282
when(delegate.onClose()).thenReturn(Mono.never());
283283

284-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
284+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
285285
verify(delegate).send(publishers.capture());
286286

287287
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
@@ -310,7 +310,7 @@ void sendMetadata() {
310310

311311
when(delegate.onClose()).thenReturn(Mono.never());
312312

313-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
313+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
314314
verify(delegate).send(publishers.capture());
315315

316316
StepVerifier.create(Flux.from(publishers.getValue()))
@@ -354,7 +354,7 @@ void sendMetadataAndData() {
354354

355355
when(delegate.onClose()).thenReturn(Mono.never());
356356

357-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
357+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
358358
verify(delegate).send(publishers.capture());
359359

360360
StepVerifier.create(Flux.from(publishers.getValue()))
@@ -373,7 +373,7 @@ void sendNonFragmentable() {
373373

374374
when(delegate.onClose()).thenReturn(Mono.never());
375375

376-
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
376+
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame.retain());
377377
verify(delegate).send(publishers.capture());
378378

379379
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
@@ -398,7 +398,7 @@ void sendZeroMaxFragmentLength() {
398398

399399
when(delegate.onClose()).thenReturn(Mono.never());
400400

401-
new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame);
401+
new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame.retain());
402402
verify(delegate).send(publishers.capture());
403403

404404
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();

rsocket-examples/src/test/java/io/rsocket/integration/InteractionsLoadTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
public class InteractionsLoadTest {
2020

2121
@Test
22-
@SlowTest
22+
//@SlowTest
2323
public void channel() {
2424
TcpServerTransport serverTransport = TcpServerTransport.create(0);
2525

@@ -80,7 +80,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
8080
if (!data.equals("foo")) {
8181
throw new IllegalStateException("Channel Server Bad message: " + data);
8282
}
83-
return DefaultPayload.create(DefaultPayload.create("bar"));
83+
return DefaultPayload.create("bar");
8484
});
8585
}
8686

0 commit comments

Comments
 (0)