Skip to content

Commit ffd00c6

Browse files
committed
Fix a few potential leaks
1 parent bf14aa3 commit ffd00c6

File tree

13 files changed

+162
-159
lines changed

13 files changed

+162
-159
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 77 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.function.Consumer;
2929
import java.util.function.Function;
30-
import java.util.function.Supplier;
3130
import javax.annotation.Nullable;
32-
3331
import org.jctools.maps.NonBlockingHashMapLong;
3432
import org.reactivestreams.Publisher;
3533
import org.reactivestreams.Subscriber;
@@ -185,7 +183,7 @@ public Flux<Payload> requestStream(Payload payload) {
185183

186184
@Override
187185
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
188-
return handleChannel(Flux.from(payloads), FrameType.REQUEST_CHANNEL);
186+
return handleChannel(Flux.from(payloads));
189187
}
190188

191189
@Override
@@ -289,108 +287,88 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
289287
}));
290288
}
291289

292-
private Flux<Payload> handleChannel(Flux<Payload> request, FrameType requestType) {
290+
private Flux<Payload> handleChannel(Flux<Payload> request) {
293291
return started.thenMany(
294292
Flux.defer(
295-
new Supplier<Flux<Payload>>() {
293+
() -> {
296294
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
297295
final int streamId = streamIdSupplier.nextStreamId();
298-
volatile @Nullable MonoProcessor<Void> subscribedRequests;
299-
boolean firstRequest = true;
300-
301-
boolean isValidToSendFrame() {
302-
return contains(streamId) && !receiver.isDisposed();
303-
}
304-
305-
void sendOneFrame(Frame frame) {
306-
if (isValidToSendFrame()) {
307-
sendProcessor.onNext(frame);
308-
}
309-
}
310-
311-
@Override
312-
public Flux<Payload> get() {
313-
return receiver
314-
.doOnRequest(
315-
l -> {
316-
boolean _firstRequest = false;
317-
synchronized (RSocketClient.this) {
318-
if (firstRequest) {
319-
_firstRequest = true;
320-
firstRequest = false;
321-
}
322-
}
296+
final AtomicBoolean firstRequest = new AtomicBoolean(true);
323297

324-
if (_firstRequest) {
325-
AtomicBoolean firstPayload = new AtomicBoolean(true);
326-
Flux<Frame> requestFrames =
327-
request
328-
.transform(
329-
f -> {
330-
LimitableRequestPublisher<Payload> wrapped =
331-
LimitableRequestPublisher.wrap(f);
332-
// Need to set this to one for first the frame
333-
wrapped.increaseRequestLimit(1);
334-
senders.put(streamId, wrapped);
335-
receivers.put(streamId, receiver);
336-
337-
return wrapped;
338-
})
339-
.map(
340-
new Function<Payload, Frame>() {
341-
342-
@Override
343-
public Frame apply(Payload payload) {
344-
final Frame requestFrame;
345-
if (firstPayload.compareAndSet(true, false)) {
346-
requestFrame =
347-
Frame.Request.from(
348-
streamId, requestType, payload, l);
349-
} else {
350-
requestFrame =
351-
Frame.PayloadFrame.from(
352-
streamId, FrameType.NEXT, payload);
353-
}
354-
payload.release();
355-
return requestFrame;
356-
}
357-
})
358-
.doOnComplete(
359-
() -> {
360-
if (FrameType.REQUEST_CHANNEL == requestType) {
361-
sendOneFrame(
362-
Frame.PayloadFrame.from(
363-
streamId, FrameType.COMPLETE));
364-
if (firstPayload.get()) {
365-
receiver.onComplete();
366-
}
367-
}
368-
});
369-
370-
requestFrames.subscribe(
371-
sendProcessor::onNext,
372-
t -> {
373-
errorConsumer.accept(t);
374-
receiver.dispose();
375-
});
376-
} else {
377-
sendOneFrame(Frame.RequestN.from(streamId, l));
378-
}
379-
})
380-
.doOnError(t -> sendOneFrame(Frame.Error.from(streamId, t)))
381-
.doOnCancel(
382-
() -> {
383-
sendOneFrame(Frame.Cancel.from(streamId));
384-
if (subscribedRequests != null) {
385-
subscribedRequests.cancel();
298+
return receiver
299+
.doOnRequest(
300+
n -> {
301+
if (firstRequest.compareAndSet(true, false)) {
302+
final AtomicBoolean firstPayload = new AtomicBoolean(true);
303+
final Flux<Frame> requestFrames =
304+
request
305+
.transform(
306+
f -> {
307+
LimitableRequestPublisher<Payload> wrapped =
308+
LimitableRequestPublisher.wrap(f);
309+
// Need to set this to one for first the frame
310+
wrapped.increaseRequestLimit(1);
311+
senders.put(streamId, wrapped);
312+
receivers.put(streamId, receiver);
313+
314+
return wrapped;
315+
})
316+
.map(
317+
payload -> {
318+
final Frame requestFrame;
319+
if (firstPayload.compareAndSet(true, false)) {
320+
requestFrame =
321+
Frame.Request.from(
322+
streamId, FrameType.REQUEST_CHANNEL, payload, n);
323+
} else {
324+
requestFrame =
325+
Frame.PayloadFrame.from(
326+
streamId, FrameType.NEXT, payload);
327+
}
328+
payload.release();
329+
return requestFrame;
330+
})
331+
.doOnComplete(
332+
() -> {
333+
if (contains(streamId) && !receiver.isDisposed()) {
334+
sendProcessor.onNext(
335+
Frame.PayloadFrame.from(
336+
streamId, FrameType.COMPLETE));
337+
}
338+
if (firstPayload.get()) {
339+
receiver.onComplete();
340+
}
341+
});
342+
343+
requestFrames.subscribe(
344+
sendProcessor::onNext,
345+
t -> {
346+
errorConsumer.accept(t);
347+
receiver.dispose();
348+
});
349+
} else {
350+
if (contains(streamId) && !receiver.isDisposed()) {
351+
sendProcessor.onNext(Frame.RequestN.from(streamId, n));
386352
}
387-
})
388-
.doFinally(
389-
s -> {
390-
receivers.remove(streamId);
391-
senders.remove(streamId);
392-
});
393-
}
353+
}
354+
})
355+
.doOnError(
356+
t -> {
357+
if (contains(streamId) && !receiver.isDisposed()) {
358+
sendProcessor.onNext(Frame.Error.from(streamId, t));
359+
}
360+
})
361+
.doOnCancel(
362+
() -> {
363+
if (contains(streamId) && !receiver.isDisposed()) {
364+
sendProcessor.onNext(Frame.Cancel.from(streamId));
365+
}
366+
})
367+
.doFinally(
368+
s -> {
369+
receivers.remove(streamId);
370+
senders.remove(streamId);
371+
});
394372
}));
395373
}
396374

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -234,15 +234,14 @@ public Mono<RSocket> start() {
234234

235235
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
236236

237-
RSocketServer rSocketServer = new RSocketServer(
238-
multiplexer.asServerConnection(),
239-
wrappedRSocketServer,
240-
frameDecoder,
241-
errorConsumer);
242-
243-
return connection
244-
.sendOne(setupFrame)
245-
.thenReturn(wrappedRSocketClient);
237+
RSocketServer rSocketServer =
238+
new RSocketServer(
239+
multiplexer.asServerConnection(),
240+
wrappedRSocketServer,
241+
frameDecoder,
242+
errorConsumer);
243+
244+
return connection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
246245
});
247246
}
248247
}
@@ -353,15 +352,17 @@ private Mono<Void> processSetupFrame(
353352
return acceptor
354353
.get()
355354
.accept(setupPayload, wrappedRSocketClient)
356-
.doOnNext(unwrappedServerSocket -> {
357-
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
358-
359-
RSocketServer rSocketServer = new RSocketServer(
360-
multiplexer.asClientConnection(),
361-
wrappedRSocketServer,
362-
frameDecoder,
363-
errorConsumer);
364-
})
355+
.doOnNext(
356+
unwrappedServerSocket -> {
357+
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
358+
359+
RSocketServer rSocketServer =
360+
new RSocketServer(
361+
multiplexer.asClientConnection(),
362+
wrappedRSocketServer,
363+
frameDecoder,
364+
errorConsumer);
365+
})
365366
.then();
366367
}
367368
}

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.rsocket.internal.UnboundedProcessor;
2929
import java.util.function.Consumer;
3030
import java.util.function.Function;
31-
3231
import org.jctools.maps.NonBlockingHashMapLong;
3332
import org.reactivestreams.Publisher;
3433
import org.reactivestreams.Subscriber;
@@ -315,14 +314,14 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
315314
payload.release();
316315
return frame;
317316
})
317+
.concatWith(Mono.fromCallable(() -> Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)))
318318
.transform(
319319
frameFlux -> {
320320
LimitableRequestPublisher<Frame> frames = LimitableRequestPublisher.wrap(frameFlux);
321321
sendingSubscriptions.put(streamId, frames);
322322
frames.increaseRequestLimit(initialRequestN);
323323
return frames;
324324
})
325-
.concatWith(Mono.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)))
326325
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
327326
.subscribe(sendProcessor::onNext, t -> handleError(streamId, t));
328327
}

rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package io.rsocket.frame;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.rsocket.framing.FrameType;
2120
import io.rsocket.exceptions.*;
21+
import io.rsocket.framing.FrameType;
2222
import java.nio.charset.StandardCharsets;
2323

2424
public class ErrorFrameFlyweight {

rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.buffer.ByteBufAllocator;
2424
import io.netty.buffer.Unpooled;
2525
import io.netty.util.Recycler.Handle;
26-
import io.netty.util.ReferenceCounted;
2726
import java.util.Objects;
2827
import reactor.util.annotation.Nullable;
2928

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.internal;
1818

19+
import io.netty.util.ReferenceCounted;
1920
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122
import java.util.function.BiFunction;
@@ -25,7 +26,7 @@
2526
import reactor.core.publisher.Flux;
2627
import reactor.core.publisher.Operators;
2728

28-
public final class SwitchTransform<T, R> extends Flux<R> {
29+
public final class SwitchTransform<T extends ReferenceCounted, R> extends Flux<R> {
2930

3031
final Publisher<? extends T> source;
3132
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
@@ -41,7 +42,8 @@ public void subscribe(CoreSubscriber<? super R> actual) {
4142
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
4243
}
4344

44-
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
45+
static final class SwitchTransformSubscriber<T extends ReferenceCounted, R>
46+
implements CoreSubscriber<T> {
4547
@SuppressWarnings("rawtypes")
4648
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
4749
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
@@ -77,6 +79,7 @@ public void onNext(T t) {
7779
Flux.from(result).subscribe(actual);
7880
} catch (Throwable e) {
7981
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
82+
t.release();
8083
return;
8184
}
8285
}

0 commit comments

Comments
 (0)