Skip to content

Ensure sendProcessor is disposed #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

Expand All @@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

@Override
public Mono<Void> metadataPush(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
}

Expand Down
72 changes: 22 additions & 50 deletions rsocket-core/src/main/java/io/rsocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.*;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
Expand All @@ -33,7 +34,6 @@
import io.rsocket.frame.VersionFlyweight;
import io.rsocket.framing.FrameType;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +43,7 @@
*
* <p>This provides encoding, decoding and field accessors.
*/
public class Frame implements Payload, ByteBufHolder {
public class Frame extends AbstractReferenceCounted implements Payload, ByteBufHolder {
private static final Recycler<Frame> RECYCLER =
new Recycler<Frame>() {
protected Frame newObject(Handle<Frame> handle) {
Expand All @@ -58,12 +58,6 @@ private Frame(final Handle<Frame> handle) {
this.handle = handle;
}

/** Clear and recycle this instance. */
private void recycle() {
content = null;
handle.recycle(this);
}

/** Return the content which is held by this {@link Frame}. */
@Override
public ByteBuf content() {
Expand Down Expand Up @@ -105,26 +99,17 @@ public Frame replace(ByteBuf content) {
return from(content);
}

/**
* Returns the reference count of this object. If {@code 0}, it means this object has been
* deallocated.
*/
@Override
public int refCnt() {
return content.refCnt();
}

/** Increases the reference count by {@code 1}. */
@Override
public Frame retain() {
content.retain();
super.retain();
return this;
}

/** Increases the reference count by the specified {@code increment}. */
@Override
public Frame retain(int increment) {
content.retain(increment);
super.retain(increment);
return this;
}

Expand All @@ -150,36 +135,12 @@ public Frame touch(@Nullable Object hint) {
return this;
}

/**
* Decreases the reference count by {@code 1} and deallocates this object if the reference count
* reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has
* been deallocated
*/
@Override
public boolean release() {
if (content != null && content.release()) {
recycle();
return true;
}
return false;
}

/**
* Decreases the reference count by the specified {@code decrement} and deallocates this object if
* the reference count reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has
* been deallocated
*/
/** Called once {@link #refCnt()} is equals 0. */
@Override
public boolean release(int decrement) {
if (content != null && content.release(decrement)) {
recycle();
return true;
}
return false;
protected void deallocate() {
content.release();
content = null;
handle.recycle(this);
}

/**
Expand Down Expand Up @@ -239,6 +200,7 @@ public int flags() {
*/
public static Frame from(final ByteBuf content) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content = content;

return frame;
Expand Down Expand Up @@ -281,6 +243,7 @@ public static Frame from(
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
SetupFrameFlyweight.computeFrameLength(
Expand Down Expand Up @@ -347,6 +310,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu

final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
ErrorFrameFlyweight.computeFrameLength(dataBuffer.readableBytes()));
Expand Down Expand Up @@ -378,6 +342,7 @@ private Lease() {}

public static Frame from(int ttl, int numberOfRequests, ByteBuf metadata) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
LeaseFrameFlyweight.computeFrameLength(metadata.readableBytes()));
Expand Down Expand Up @@ -411,6 +376,7 @@ public static Frame from(int streamId, int requestN) {
}

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content = ByteBufAllocator.DEFAULT.buffer(RequestNFrameFlyweight.computeFrameLength());
frame.content.writerIndex(RequestNFrameFlyweight.encode(frame.content, streamId, requestN));
return frame;
Expand Down Expand Up @@ -438,6 +404,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
Expand All @@ -464,6 +431,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init

public static Frame from(int streamId, FrameType type, int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, null, 0));
frame.content.writerIndex(
Expand All @@ -480,6 +448,7 @@ public static Frame from(
int initialRequestN,
int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
Expand Down Expand Up @@ -543,6 +512,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int flag
public static Frame from(
int streamId, FrameType type, @Nullable ByteBuf metadata, ByteBuf data, int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
FrameHeaderFlyweight.computeFrameHeaderLength(
Expand All @@ -559,6 +529,7 @@ private Cancel() {}

public static Frame from(int streamId) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, null, 0));
Expand All @@ -575,6 +546,7 @@ private Keepalive() {}

public static Frame from(ByteBuf data, boolean respond) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
KeepaliveFrameFlyweight.computeFrameLength(data.readableBytes()));
Expand Down Expand Up @@ -611,12 +583,12 @@ public boolean equals(Object o) {
return false;
}
final Frame frame = (Frame) o;
return Objects.equals(content, frame.content);
return content.equals(frame.content());
}

@Override
public int hashCode() {
return Objects.hash(content);
return content.hashCode();
}

@Override
Expand Down
18 changes: 7 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

abstract class KeepAliveHandler {
abstract class KeepAliveHandler implements Disposable {
private final KeepAlive keepAlive;
private final UnicastProcessor<Frame> sent = UnicastProcessor.create();
private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
private final Flux<Long> interval;
private Disposable intervalDisposable;
private volatile long lastReceivedMillis;

Expand All @@ -26,20 +25,17 @@ static KeepAliveHandler ofClient(KeepAlive keepAlive) {

private KeepAliveHandler(KeepAlive keepAlive) {
this.keepAlive = keepAlive;
this.interval = Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()));
}

public void start() {
this.lastReceivedMillis = System.currentTimeMillis();
intervalDisposable = interval.subscribe(v -> onIntervalTick());
this.intervalDisposable =
Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()))
.subscribe(v -> onIntervalTick());
}

public void stop() {
@Override
public void dispose() {
sent.onComplete();
timeout.onComplete();
if (intervalDisposable != null) {
intervalDisposable.dispose();
}
intervalDisposable.dispose();
}

public void receive(Frame keepAliveFrame) {
Expand Down
Loading