Skip to content

Fix a few potential leaks #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ subprojects {

test {
useJUnitPlatform()

systemProperty "io.netty.leakDetection.level", "ADVANCED"
}
}

Expand Down
74 changes: 14 additions & 60 deletions rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.AbstractReferenceCounted;
import io.rsocket.Frame.Setup;
import io.rsocket.frame.SetupFrameFlyweight;
Expand All @@ -30,38 +29,9 @@
*/
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {

public static final int NO_FLAGS = 0;
public static final int HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE;

public static ConnectionSetupPayload create(String metadataMimeType, String dataMimeType) {
return new DefaultConnectionSetupPayload(
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, NO_FLAGS);
}

public static ConnectionSetupPayload create(
String metadataMimeType, String dataMimeType, Payload payload) {
return new DefaultConnectionSetupPayload(
metadataMimeType,
dataMimeType,
payload.sliceData(),
payload.sliceMetadata(),
payload.hasMetadata() ? FLAGS_M : 0);
}

public static ConnectionSetupPayload create(
String metadataMimeType, String dataMimeType, int flags) {
return new DefaultConnectionSetupPayload(
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, flags);
}

public static ConnectionSetupPayload create(final Frame setupFrame) {
Frame.ensureFrameType(FrameType.SETUP, setupFrame);
return new DefaultConnectionSetupPayload(
Setup.metadataMimeType(setupFrame),
Setup.dataMimeType(setupFrame),
setupFrame.sliceData(),
setupFrame.sliceMetadata(),
Setup.getFlags(setupFrame));
return new DefaultConnectionSetupPayload(setupFrame);
}

public abstract String metadataMimeType();
Expand All @@ -71,7 +41,7 @@ public static ConnectionSetupPayload create(final Frame setupFrame) {
public abstract int getFlags();

public boolean willClientHonorLease() {
return Frame.isFlagSet(getFlags(), HONOR_LEASE);
return Frame.isFlagSet(getFlags(), SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE);
}

@Override
Expand All @@ -97,68 +67,52 @@ public ConnectionSetupPayload retain(int increment) {

private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {

private final String metadataMimeType;
private final String dataMimeType;
private final ByteBuf data;
private final ByteBuf metadata;
private final int flags;

public DefaultConnectionSetupPayload(
String metadataMimeType, String dataMimeType, ByteBuf data, ByteBuf metadata, int flags) {
this.metadataMimeType = metadataMimeType;
this.dataMimeType = dataMimeType;
this.data = data;
this.metadata = metadata;
this.flags = flags;

if (!hasMetadata() && metadata.readableBytes() > 0) {
throw new IllegalArgumentException("metadata flag incorrect");
}
private final Frame setupFrame;

public DefaultConnectionSetupPayload(final Frame setupFrame) {
this.setupFrame = setupFrame;
}

@Override
public String metadataMimeType() {
return metadataMimeType;
return Setup.metadataMimeType(setupFrame);
}

@Override
public String dataMimeType() {
return dataMimeType;
return Setup.dataMimeType(setupFrame);
}

@Override
public ByteBuf sliceData() {
return data;
return setupFrame.sliceData();
}

@Override
public ByteBuf sliceMetadata() {
return metadata;
return setupFrame.sliceMetadata();
}

@Override
public int getFlags() {
return flags;
return Setup.getFlags(setupFrame);
}

@Override
public ConnectionSetupPayload touch() {
data.touch();
metadata.touch();
setupFrame.touch();
return this;
}

@Override
public ConnectionSetupPayload touch(Object hint) {
data.touch(hint);
metadata.touch(hint);
setupFrame.touch(hint);
return this;
}

@Override
protected void deallocate() {
data.release();
metadata.release();
setupFrame.release();
}
}
}
176 changes: 77 additions & 99 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -185,7 +183,7 @@ public Flux<Payload> requestStream(Payload payload) {

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return handleChannel(Flux.from(payloads), FrameType.REQUEST_CHANNEL);
return handleChannel(Flux.from(payloads));
}

@Override
Expand Down Expand Up @@ -289,108 +287,88 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
}));
}

private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType) {
private Flux<Payload> handleChannel(Flux<Payload> request) {
return started.thenMany(
Flux.defer(
new Supplier<Flux<Payload>>() {
() -> {
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final int streamId = streamIdSupplier.nextStreamId();
volatile @Nullable MonoProcessor<Void> subscribedRequests;
boolean firstRequest = true;

boolean isValidToSendFrame() {
return contains(streamId) && !receiver.isDisposed();
}

void sendOneFrame(Frame frame) {
if (isValidToSendFrame()) {
sendProcessor.onNext(frame);
}
}

@Override
public Flux<Payload> get() {
return receiver
.doOnRequest(
l -> {
boolean _firstRequest = false;
synchronized (RSocketClient.this) {
if (firstRequest) {
_firstRequest = true;
firstRequest = false;
}
}
final AtomicBoolean firstRequest = new AtomicBoolean(true);

if (_firstRequest) {
AtomicBoolean firstPayload = new AtomicBoolean(true);
Flux<Frame> requestFrames =
request
.transform(
f -> {
LimitableRequestPublisher<Payload> wrapped =
LimitableRequestPublisher.wrap(f);
// Need to set this to one for first the frame
wrapped.increaseRequestLimit(1);
senders.put(streamId, wrapped);
receivers.put(streamId, receiver);

return wrapped;
})
.map(
new Function<Payload, Frame>() {

@Override
public Frame apply(Payload payload) {
final Frame requestFrame;
if (firstPayload.compareAndSet(true, false)) {
requestFrame =
Frame.Request.from(
streamId, requestType, payload, l);
} else {
requestFrame =
Frame.PayloadFrame.from(
streamId, FrameType.NEXT, payload);
}
payload.release();
return requestFrame;
}
})
.doOnComplete(
() -> {
if (FrameType.REQUEST_CHANNEL == requestType) {
sendOneFrame(
Frame.PayloadFrame.from(
streamId, FrameType.COMPLETE));
if (firstPayload.get()) {
receiver.onComplete();
}
}
});

requestFrames.subscribe(
sendProcessor::onNext,
t -> {
errorConsumer.accept(t);
receiver.dispose();
});
} else {
sendOneFrame(Frame.RequestN.from(streamId, l));
}
})
.doOnError(t -> sendOneFrame(Frame.Error.from(streamId, t)))
.doOnCancel(
() -> {
sendOneFrame(Frame.Cancel.from(streamId));
if (subscribedRequests != null) {
subscribedRequests.cancel();
return receiver
.doOnRequest(
n -> {
if (firstRequest.compareAndSet(true, false)) {
final AtomicBoolean firstPayload = new AtomicBoolean(true);
final Flux<Frame> requestFrames =
request
.transform(
f -> {
LimitableRequestPublisher<Payload> wrapped =
LimitableRequestPublisher.wrap(f);
// Need to set this to one for first the frame
wrapped.increaseRequestLimit(1);
senders.put(streamId, wrapped);
receivers.put(streamId, receiver);

return wrapped;
})
.map(
payload -> {
final Frame requestFrame;
if (firstPayload.compareAndSet(true, false)) {
requestFrame =
Frame.Request.from(
streamId, FrameType.REQUEST_CHANNEL, payload, n);
} else {
requestFrame =
Frame.PayloadFrame.from(
streamId, FrameType.NEXT, payload);
}
payload.release();
return requestFrame;
})
.doOnComplete(
() -> {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(
Frame.PayloadFrame.from(
streamId, FrameType.COMPLETE));
}
if (firstPayload.get()) {
receiver.onComplete();
}
});

requestFrames.subscribe(
sendProcessor::onNext,
t -> {
errorConsumer.accept(t);
receiver.dispose();
});
} else {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.RequestN.from(streamId, n));
}
})
.doFinally(
s -> {
receivers.remove(streamId);
senders.remove(streamId);
});
}
}
})
.doOnError(
t -> {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Error.from(streamId, t));
}
})
.doOnCancel(
() -> {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(Frame.Cancel.from(streamId));
}
})
.doFinally(
s -> {
receivers.remove(streamId);
senders.remove(streamId);
});
}));
}

Expand Down
Loading