Skip to content

Commit 08705ea

Browse files
authored
Merge pull request #508 from rdegnan/fix-leaks
Ensure sendProcessor is disposed
2 parents 1a4844f + 73b9fd9 commit 08705ea

File tree

11 files changed

+281
-325
lines changed

11 files changed

+281
-325
lines changed

rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {
3131

3232
@Override
3333
public Mono<Void> fireAndForget(Payload payload) {
34+
payload.release();
3435
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
3536
}
3637

3738
@Override
3839
public Mono<Payload> requestResponse(Payload payload) {
40+
payload.release();
3941
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
4042
}
4143

4244
@Override
4345
public Flux<Payload> requestStream(Payload payload) {
46+
payload.release();
4447
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
4548
}
4649

@@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
5154

5255
@Override
5356
public Mono<Void> metadataPush(Payload payload) {
57+
payload.release();
5458
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
5559
}
5660

rsocket-core/src/main/java/io/rsocket/Frame.java

Lines changed: 22 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
2020

2121
import io.netty.buffer.*;
22+
import io.netty.util.AbstractReferenceCounted;
2223
import io.netty.util.IllegalReferenceCountException;
2324
import io.netty.util.Recycler;
2425
import io.netty.util.Recycler.Handle;
@@ -33,7 +34,6 @@
3334
import io.rsocket.frame.VersionFlyweight;
3435
import io.rsocket.framing.FrameType;
3536
import java.nio.charset.StandardCharsets;
36-
import java.util.Objects;
3737
import javax.annotation.Nullable;
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@
4343
*
4444
* <p>This provides encoding, decoding and field accessors.
4545
*/
46-
public class Frame implements Payload, ByteBufHolder {
46+
public class Frame extends AbstractReferenceCounted implements Payload, ByteBufHolder {
4747
private static final Recycler<Frame> RECYCLER =
4848
new Recycler<Frame>() {
4949
protected Frame newObject(Handle<Frame> handle) {
@@ -58,12 +58,6 @@ private Frame(final Handle<Frame> handle) {
5858
this.handle = handle;
5959
}
6060

61-
/** Clear and recycle this instance. */
62-
private void recycle() {
63-
content = null;
64-
handle.recycle(this);
65-
}
66-
6761
/** Return the content which is held by this {@link Frame}. */
6862
@Override
6963
public ByteBuf content() {
@@ -105,26 +99,17 @@ public Frame replace(ByteBuf content) {
10599
return from(content);
106100
}
107101

108-
/**
109-
* Returns the reference count of this object. If {@code 0}, it means this object has been
110-
* deallocated.
111-
*/
112-
@Override
113-
public int refCnt() {
114-
return content.refCnt();
115-
}
116-
117102
/** Increases the reference count by {@code 1}. */
118103
@Override
119104
public Frame retain() {
120-
content.retain();
105+
super.retain();
121106
return this;
122107
}
123108

124109
/** Increases the reference count by the specified {@code increment}. */
125110
@Override
126111
public Frame retain(int increment) {
127-
content.retain(increment);
112+
super.retain(increment);
128113
return this;
129114
}
130115

@@ -150,36 +135,12 @@ public Frame touch(@Nullable Object hint) {
150135
return this;
151136
}
152137

153-
/**
154-
* Decreases the reference count by {@code 1} and deallocates this object if the reference count
155-
* reaches at {@code 0}.
156-
*
157-
* @return {@code true} if and only if the reference count became {@code 0} and this object has
158-
* been deallocated
159-
*/
160-
@Override
161-
public boolean release() {
162-
if (content != null && content.release()) {
163-
recycle();
164-
return true;
165-
}
166-
return false;
167-
}
168-
169-
/**
170-
* Decreases the reference count by the specified {@code decrement} and deallocates this object if
171-
* the reference count reaches at {@code 0}.
172-
*
173-
* @return {@code true} if and only if the reference count became {@code 0} and this object has
174-
* been deallocated
175-
*/
138+
/** Called once {@link #refCnt()} is equals 0. */
176139
@Override
177-
public boolean release(int decrement) {
178-
if (content != null && content.release(decrement)) {
179-
recycle();
180-
return true;
181-
}
182-
return false;
140+
protected void deallocate() {
141+
content.release();
142+
content = null;
143+
handle.recycle(this);
183144
}
184145

185146
/**
@@ -239,6 +200,7 @@ public int flags() {
239200
*/
240201
public static Frame from(final ByteBuf content) {
241202
final Frame frame = RECYCLER.get();
203+
frame.setRefCnt(1);
242204
frame.content = content;
243205

244206
return frame;
@@ -281,6 +243,7 @@ public static Frame from(
281243
final ByteBuf data = payload.sliceData();
282244

283245
final Frame frame = RECYCLER.get();
246+
frame.setRefCnt(1);
284247
frame.content =
285248
ByteBufAllocator.DEFAULT.buffer(
286249
SetupFrameFlyweight.computeFrameLength(
@@ -347,6 +310,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu
347310

348311
final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
349312
final Frame frame = RECYCLER.get();
313+
frame.setRefCnt(1);
350314
frame.content =
351315
ByteBufAllocator.DEFAULT.buffer(
352316
ErrorFrameFlyweight.computeFrameLength(dataBuffer.readableBytes()));
@@ -378,6 +342,7 @@ private Lease() {}
378342

379343
public static Frame from(int ttl, int numberOfRequests, ByteBuf metadata) {
380344
final Frame frame = RECYCLER.get();
345+
frame.setRefCnt(1);
381346
frame.content =
382347
ByteBufAllocator.DEFAULT.buffer(
383348
LeaseFrameFlyweight.computeFrameLength(metadata.readableBytes()));
@@ -411,6 +376,7 @@ public static Frame from(int streamId, int requestN) {
411376
}
412377

413378
final Frame frame = RECYCLER.get();
379+
frame.setRefCnt(1);
414380
frame.content = ByteBufAllocator.DEFAULT.buffer(RequestNFrameFlyweight.computeFrameLength());
415381
frame.content.writerIndex(RequestNFrameFlyweight.encode(frame.content, streamId, requestN));
416382
return frame;
@@ -438,6 +404,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
438404
final ByteBuf data = payload.sliceData();
439405

440406
final Frame frame = RECYCLER.get();
407+
frame.setRefCnt(1);
441408
frame.content =
442409
ByteBufAllocator.DEFAULT.buffer(
443410
RequestFrameFlyweight.computeFrameLength(
@@ -464,6 +431,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
464431

465432
public static Frame from(int streamId, FrameType type, int flags) {
466433
final Frame frame = RECYCLER.get();
434+
frame.setRefCnt(1);
467435
frame.content =
468436
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, null, 0));
469437
frame.content.writerIndex(
@@ -480,6 +448,7 @@ public static Frame from(
480448
int initialRequestN,
481449
int flags) {
482450
final Frame frame = RECYCLER.get();
451+
frame.setRefCnt(1);
483452
frame.content =
484453
ByteBufAllocator.DEFAULT.buffer(
485454
RequestFrameFlyweight.computeFrameLength(
@@ -543,6 +512,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int flag
543512
public static Frame from(
544513
int streamId, FrameType type, @Nullable ByteBuf metadata, ByteBuf data, int flags) {
545514
final Frame frame = RECYCLER.get();
515+
frame.setRefCnt(1);
546516
frame.content =
547517
ByteBufAllocator.DEFAULT.buffer(
548518
FrameHeaderFlyweight.computeFrameHeaderLength(
@@ -559,6 +529,7 @@ private Cancel() {}
559529

560530
public static Frame from(int streamId) {
561531
final Frame frame = RECYCLER.get();
532+
frame.setRefCnt(1);
562533
frame.content =
563534
ByteBufAllocator.DEFAULT.buffer(
564535
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, null, 0));
@@ -575,6 +546,7 @@ private Keepalive() {}
575546

576547
public static Frame from(ByteBuf data, boolean respond) {
577548
final Frame frame = RECYCLER.get();
549+
frame.setRefCnt(1);
578550
frame.content =
579551
ByteBufAllocator.DEFAULT.buffer(
580552
KeepaliveFrameFlyweight.computeFrameLength(data.readableBytes()));
@@ -611,12 +583,12 @@ public boolean equals(Object o) {
611583
return false;
612584
}
613585
final Frame frame = (Frame) o;
614-
return Objects.equals(content, frame.content);
586+
return content.equals(frame.content());
615587
}
616588

617589
@Override
618590
public int hashCode() {
619-
return Objects.hash(content);
591+
return content.hashCode();
620592
}
621593

622594
@Override

rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
import reactor.core.publisher.MonoProcessor;
99
import reactor.core.publisher.UnicastProcessor;
1010

11-
abstract class KeepAliveHandler {
11+
abstract class KeepAliveHandler implements Disposable {
1212
private final KeepAlive keepAlive;
1313
private final UnicastProcessor<Frame> sent = UnicastProcessor.create();
1414
private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
15-
private final Flux<Long> interval;
1615
private Disposable intervalDisposable;
1716
private volatile long lastReceivedMillis;
1817

@@ -26,20 +25,17 @@ static KeepAliveHandler ofClient(KeepAlive keepAlive) {
2625

2726
private KeepAliveHandler(KeepAlive keepAlive) {
2827
this.keepAlive = keepAlive;
29-
this.interval = Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()));
30-
}
31-
32-
public void start() {
3328
this.lastReceivedMillis = System.currentTimeMillis();
34-
intervalDisposable = interval.subscribe(v -> onIntervalTick());
29+
this.intervalDisposable =
30+
Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()))
31+
.subscribe(v -> onIntervalTick());
3532
}
3633

37-
public void stop() {
34+
@Override
35+
public void dispose() {
3836
sent.onComplete();
3937
timeout.onComplete();
40-
if (intervalDisposable != null) {
41-
intervalDisposable.dispose();
42-
}
38+
intervalDisposable.dispose();
4339
}
4440

4541
public void receive(Frame keepAliveFrame) {

0 commit comments

Comments
 (0)