Skip to content

Commit 0c7f15d

Browse files
Ryland Degnanrobertroeser
Ryland Degnan
authored andcommitted
Fix a few potential leaks (#504)
1 parent bf14aa3 commit 0c7f15d

File tree

15 files changed

+177
-217
lines changed

15 files changed

+177
-217
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ subprojects {
8989

9090
test {
9191
useJUnitPlatform()
92+
93+
systemProperty "io.netty.leakDetection.level", "ADVANCED"
9294
}
9395
}
9496

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
2020

2121
import io.netty.buffer.ByteBuf;
22-
import io.netty.buffer.Unpooled;
2322
import io.netty.util.AbstractReferenceCounted;
2423
import io.rsocket.Frame.Setup;
2524
import io.rsocket.frame.SetupFrameFlyweight;
@@ -30,38 +29,9 @@
3029
*/
3130
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {
3231

33-
public static final int NO_FLAGS = 0;
34-
public static final int HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE;
35-
36-
public static ConnectionSetupPayload create(String metadataMimeType, String dataMimeType) {
37-
return new DefaultConnectionSetupPayload(
38-
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, NO_FLAGS);
39-
}
40-
41-
public static ConnectionSetupPayload create(
42-
String metadataMimeType, String dataMimeType, Payload payload) {
43-
return new DefaultConnectionSetupPayload(
44-
metadataMimeType,
45-
dataMimeType,
46-
payload.sliceData(),
47-
payload.sliceMetadata(),
48-
payload.hasMetadata() ? FLAGS_M : 0);
49-
}
50-
51-
public static ConnectionSetupPayload create(
52-
String metadataMimeType, String dataMimeType, int flags) {
53-
return new DefaultConnectionSetupPayload(
54-
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, flags);
55-
}
56-
5732
public static ConnectionSetupPayload create(final Frame setupFrame) {
5833
Frame.ensureFrameType(FrameType.SETUP, setupFrame);
59-
return new DefaultConnectionSetupPayload(
60-
Setup.metadataMimeType(setupFrame),
61-
Setup.dataMimeType(setupFrame),
62-
setupFrame.sliceData(),
63-
setupFrame.sliceMetadata(),
64-
Setup.getFlags(setupFrame));
34+
return new DefaultConnectionSetupPayload(setupFrame);
6535
}
6636

6737
public abstract String metadataMimeType();
@@ -71,7 +41,7 @@ public static ConnectionSetupPayload create(final Frame setupFrame) {
7141
public abstract int getFlags();
7242

7343
public boolean willClientHonorLease() {
74-
return Frame.isFlagSet(getFlags(), HONOR_LEASE);
44+
return Frame.isFlagSet(getFlags(), SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE);
7545
}
7646

7747
@Override
@@ -97,68 +67,52 @@ public ConnectionSetupPayload retain(int increment) {
9767

9868
private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {
9969

100-
private final String metadataMimeType;
101-
private final String dataMimeType;
102-
private final ByteBuf data;
103-
private final ByteBuf metadata;
104-
private final int flags;
105-
106-
public DefaultConnectionSetupPayload(
107-
String metadataMimeType, String dataMimeType, ByteBuf data, ByteBuf metadata, int flags) {
108-
this.metadataMimeType = metadataMimeType;
109-
this.dataMimeType = dataMimeType;
110-
this.data = data;
111-
this.metadata = metadata;
112-
this.flags = flags;
113-
114-
if (!hasMetadata() && metadata.readableBytes() > 0) {
115-
throw new IllegalArgumentException("metadata flag incorrect");
116-
}
70+
private final Frame setupFrame;
71+
72+
public DefaultConnectionSetupPayload(final Frame setupFrame) {
73+
this.setupFrame = setupFrame;
11774
}
11875

11976
@Override
12077
public String metadataMimeType() {
121-
return metadataMimeType;
78+
return Setup.metadataMimeType(setupFrame);
12279
}
12380

12481
@Override
12582
public String dataMimeType() {
126-
return dataMimeType;
83+
return Setup.dataMimeType(setupFrame);
12784
}
12885

12986
@Override
13087
public ByteBuf sliceData() {
131-
return data;
88+
return setupFrame.sliceData();
13289
}
13390

13491
@Override
13592
public ByteBuf sliceMetadata() {
136-
return metadata;
93+
return setupFrame.sliceMetadata();
13794
}
13895

13996
@Override
14097
public int getFlags() {
141-
return flags;
98+
return Setup.getFlags(setupFrame);
14299
}
143100

144101
@Override
145102
public ConnectionSetupPayload touch() {
146-
data.touch();
147-
metadata.touch();
103+
setupFrame.touch();
148104
return this;
149105
}
150106

151107
@Override
152108
public ConnectionSetupPayload touch(Object hint) {
153-
data.touch(hint);
154-
metadata.touch(hint);
109+
setupFrame.touch(hint);
155110
return this;
156111
}
157112

158113
@Override
159114
protected void deallocate() {
160-
data.release();
161-
metadata.release();
115+
setupFrame.release();
162116
}
163117
}
164118
}

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 77 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.function.Consumer;
2929
import java.util.function.Function;
30-
import java.util.function.Supplier;
3130
import javax.annotation.Nullable;
32-
3331
import org.jctools.maps.NonBlockingHashMapLong;
3432
import org.reactivestreams.Publisher;
3533
import org.reactivestreams.Subscriber;
@@ -185,7 +183,7 @@ public Flux<Payload> requestStream(Payload payload) {
185183

186184
@Override
187185
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
188-
return handleChannel(Flux.from(payloads), FrameType.REQUEST_CHANNEL);
186+
return handleChannel(Flux.from(payloads));
189187
}
190188

191189
@Override
@@ -289,108 +287,88 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
289287
}));
290288
}
291289

292-
private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType) {
290+
private Flux<Payload> handleChannel(Flux<Payload> request) {
293291
return started.thenMany(
294292
Flux.defer(
295-
new Supplier<Flux<Payload>>() {
293+
() -> {
296294
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
297295
final int streamId = streamIdSupplier.nextStreamId();
298-
volatile @Nullable MonoProcessor<Void> subscribedRequests;
299-
boolean firstRequest = true;
300-
301-
boolean isValidToSendFrame() {
302-
return contains(streamId) && !receiver.isDisposed();
303-
}
304-
305-
void sendOneFrame(Frame frame) {
306-
if (isValidToSendFrame()) {
307-
sendProcessor.onNext(frame);
308-
}
309-
}
310-
311-
@Override
312-
public Flux<Payload> get() {
313-
return receiver
314-
.doOnRequest(
315-
l -> {
316-
boolean _firstRequest = false;
317-
synchronized (RSocketClient.this) {
318-
if (firstRequest) {
319-
_firstRequest = true;
320-
firstRequest = false;
321-
}
322-
}
296+
final AtomicBoolean firstRequest = new AtomicBoolean(true);
323297

324-
if (_firstRequest) {
325-
AtomicBoolean firstPayload = new AtomicBoolean(true);
326-
Flux<Frame> requestFrames =
327-
request
328-
.transform(
329-
f -> {
330-
LimitableRequestPublisher<Payload> wrapped =
331-
LimitableRequestPublisher.wrap(f);
332-
// Need to set this to one for first the frame
333-
wrapped.increaseRequestLimit(1);
334-
senders.put(streamId, wrapped);
335-
receivers.put(streamId, receiver);
336-
337-
return wrapped;
338-
})
339-
.map(
340-
new Function<Payload, Frame>() {
341-
342-
@Override
343-
public Frame apply(Payload payload) {
344-
final Frame requestFrame;
345-
if (firstPayload.compareAndSet(true, false)) {
346-
requestFrame =
347-
Frame.Request.from(
348-
streamId, requestType, payload, l);
349-
} else {
350-
requestFrame =
351-
Frame.PayloadFrame.from(
352-
streamId, FrameType.NEXT, payload);
353-
}
354-
payload.release();
355-
return requestFrame;
356-
}
357-
})
358-
.doOnComplete(
359-
() -> {
360-
if (FrameType.REQUEST_CHANNEL == requestType) {
361-
sendOneFrame(
362-
Frame.PayloadFrame.from(
363-
streamId, FrameType.COMPLETE));
364-
if (firstPayload.get()) {
365-
receiver.onComplete();
366-
}
367-
}
368-
});
369-
370-
requestFrames.subscribe(
371-
sendProcessor::onNext,
372-
t -> {
373-
errorConsumer.accept(t);
374-
receiver.dispose();
375-
});
376-
} else {
377-
sendOneFrame(Frame.RequestN.from(streamId, l));
378-
}
379-
})
380-
.doOnError(t -> sendOneFrame(Frame.Error.from(streamId, t)))
381-
.doOnCancel(
382-
() -> {
383-
sendOneFrame(Frame.Cancel.from(streamId));
384-
if (subscribedRequests != null) {
385-
subscribedRequests.cancel();
298+
return receiver
299+
.doOnRequest(
300+
n -> {
301+
if (firstRequest.compareAndSet(true, false)) {
302+
final AtomicBoolean firstPayload = new AtomicBoolean(true);
303+
final Flux<Frame> requestFrames =
304+
request
305+
.transform(
306+
f -> {
307+
LimitableRequestPublisher<Payload> wrapped =
308+
LimitableRequestPublisher.wrap(f);
309+
// Need to set this to one for first the frame
310+
wrapped.increaseRequestLimit(1);
311+
senders.put(streamId, wrapped);
312+
receivers.put(streamId, receiver);
313+
314+
return wrapped;
315+
})
316+
.map(
317+
payload -> {
318+
final Frame requestFrame;
319+
if (firstPayload.compareAndSet(true, false)) {
320+
requestFrame =
321+
Frame.Request.from(
322+
streamId, FrameType.REQUEST_CHANNEL, payload, n);
323+
} else {
324+
requestFrame =
325+
Frame.PayloadFrame.from(
326+
streamId, FrameType.NEXT, payload);
327+
}
328+
payload.release();
329+
return requestFrame;
330+
})
331+
.doOnComplete(
332+
() -> {
333+
if (contains(streamId) && !receiver.isDisposed()) {
334+
sendProcessor.onNext(
335+
Frame.PayloadFrame.from(
336+
streamId, FrameType.COMPLETE));
337+
}
338+
if (firstPayload.get()) {
339+
receiver.onComplete();
340+
}
341+
});
342+
343+
requestFrames.subscribe(
344+
sendProcessor::onNext,
345+
t -> {
346+
errorConsumer.accept(t);
347+
receiver.dispose();
348+
});
349+
} else {
350+
if (contains(streamId) && !receiver.isDisposed()) {
351+
sendProcessor.onNext(Frame.RequestN.from(streamId, n));
386352
}
387-
})
388-
.doFinally(
389-
s -> {
390-
receivers.remove(streamId);
391-
senders.remove(streamId);
392-
});
393-
}
353+
}
354+
})
355+
.doOnError(
356+
t -> {
357+
if (contains(streamId) && !receiver.isDisposed()) {
358+
sendProcessor.onNext(Frame.Error.from(streamId, t));
359+
}
360+
})
361+
.doOnCancel(
362+
() -> {
363+
if (contains(streamId) && !receiver.isDisposed()) {
364+
sendProcessor.onNext(Frame.Cancel.from(streamId));
365+
}
366+
})
367+
.doFinally(
368+
s -> {
369+
receivers.remove(streamId);
370+
senders.remove(streamId);
371+
});
394372
}));
395373
}
396374

0 commit comments

Comments
 (0)