Skip to content

Commit 7bc4ed9

Browse files
committed
provides workaround for FluxPublishOn to ensure that all elements are released in case of racing
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent d50385e commit 7bc4ed9

File tree

7 files changed

+421
-319
lines changed

7 files changed

+421
-319
lines changed

rsocket-core/src/main/java/io/rsocket/core/CleanOnClearQueueDecorator.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
import java.util.Queue;
88
import java.util.concurrent.atomic.AtomicBoolean;
99

10+
/**
11+
* This queue decorator is temporary solution in order to workaround leaks in UnicastProcessor and
12+
* FluxPublishOn. <br>
13+
* For more information about the root-cause of leaks, please see
14+
* https://github.com/reactor/reactor-core/pull/2114
15+
*/
1016
final class CleanOnClearQueueDecorator extends AtomicBoolean implements Queue<Payload> {
1117
final Queue<Payload> delegate;
1218

@@ -23,6 +29,23 @@ public void clear() {
2329
}
2430
}
2531

32+
@Override
33+
public boolean offer(Payload payload) {
34+
if (get()) {
35+
ReferenceCountUtil.safeRelease(payload);
36+
return true;
37+
}
38+
39+
boolean result = delegate.offer(payload);
40+
41+
// ensures in case of racing offered element is released for sure
42+
if (get()) {
43+
ReferenceCountUtil.safeRelease(payload);
44+
}
45+
46+
return result;
47+
}
48+
2649
@Override
2750
public int size() {
2851
return delegate.size();
@@ -83,15 +106,6 @@ public boolean retainAll(Collection<?> c) {
83106
return delegate.retainAll(c);
84107
}
85108

86-
@Override
87-
public boolean offer(Payload payload) {
88-
if (get()) {
89-
ReferenceCountUtil.safeRelease(payload);
90-
return true;
91-
}
92-
return delegate.offer(payload);
93-
}
94-
95109
@Override
96110
public Payload remove() {
97111
return delegate.remove();
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.rsocket.core;
2+
3+
import io.rsocket.Payload;
4+
import java.lang.reflect.Constructor;
5+
import java.lang.reflect.InvocationTargetException;
6+
import java.util.Queue;
7+
import java.util.function.Supplier;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.scheduler.Schedulers;
10+
import reactor.util.concurrent.Queues;
11+
12+
/**
13+
* This class is a temporary solution to workaround memory leaks related to bug in reactor. <br>
14+
* For more information, pleas see https://github.com/reactor/reactor-core/pull/2114
15+
*/
16+
@SuppressWarnings("unchecked")
17+
final class LimitRateOperator {
18+
19+
static final Constructor<Flux<Payload>> fluxPublishOnCtr;
20+
21+
static {
22+
try {
23+
Class<?> fluxPublishOnClass = Class.forName("reactor.core.publisher.FluxPublishOn");
24+
fluxPublishOnCtr =
25+
(Constructor<Flux<Payload>>) fluxPublishOnClass.getDeclaredConstructors()[0];
26+
fluxPublishOnCtr.setAccessible(true);
27+
} catch (ClassNotFoundException e) {
28+
throw new RuntimeException(e);
29+
}
30+
}
31+
32+
static Flux<Payload> applyLimitRateOperator(Flux<Payload> source, int limit) {
33+
Supplier<Queue<Payload>> queueSupplier =
34+
() -> {
35+
Queue<Payload> queue = Queues.<Payload>get(limit).get();
36+
return new CleanOnClearQueueDecorator(queue);
37+
};
38+
39+
try {
40+
return fluxPublishOnCtr.newInstance(
41+
source, Schedulers.immediate(), true, limit, limit, queueSupplier);
42+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
43+
throw new RuntimeException(e);
44+
}
45+
}
46+
}

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.LimitRateOperator.applyLimitRateOperator;
1920
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
2021
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
2122
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
@@ -431,7 +432,7 @@ public void accept(long n) {
431432
receivers.put(streamId, receiver);
432433

433434
inboundFlux
434-
.limitRate(Queues.SMALL_BUFFER_SIZE)
435+
.transform(f -> applyLimitRateOperator(f, Queues.SMALL_BUFFER_SIZE))
435436
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
436437
.subscribe(upstreamSubscriber);
437438
if (!payloadReleasedFlag.getAndSet(true)) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.LimitRateOperator.applyLimitRateOperator;
1920
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
2021

2122
import io.netty.buffer.ByteBuf;
@@ -475,7 +476,7 @@ protected void hookFinally(SignalType type) {
475476

476477
sendingSubscriptions.put(streamId, subscriber);
477478
response
478-
.limitRate(Queues.SMALL_BUFFER_SIZE)
479+
.transform(f -> applyLimitRateOperator(f, Queues.SMALL_BUFFER_SIZE))
479480
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
480481
.subscribe(subscriber);
481482
}

0 commit comments

Comments
 (0)