Skip to content

fixes memory leak and adds test that reproduces live lock on queue.poll() #989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ void drainRegular(long expectedState, Subscriber<? super ByteBuf> a) {

while (r != e) {
ByteBuf t;
boolean done = this.done;
boolean empty;

if (!pq.isEmpty()) {
Expand All @@ -226,7 +227,10 @@ void drainRegular(long expectedState, Subscriber<? super ByteBuf> a) {
empty = t == null;
}

if (checkTerminated(empty, a)) {
if (checkTerminated(done, empty, a)) {
if (!empty) {
release(t);
}
return;
}

Expand All @@ -240,7 +244,7 @@ void drainRegular(long expectedState, Subscriber<? super ByteBuf> a) {
}

if (r == e) {
if (checkTerminated(q.isEmpty() && pq.isEmpty(), a)) {
if (checkTerminated(this.done, q.isEmpty() && pq.isEmpty(), a)) {
return;
}
}
Expand All @@ -263,12 +267,12 @@ void drainRegular(long expectedState, Subscriber<? super ByteBuf> a) {

void drainFused(long expectedState, Subscriber<? super ByteBuf> a) {
for (; ; ) {
boolean d = done;
boolean d = this.done;

a.onNext(null);

if (d) {
Throwable ex = error;
Throwable ex = this.error;
if (ex != null) {
a.onError(ex);
} else {
Expand All @@ -288,15 +292,15 @@ void drainFused(long expectedState, Subscriber<? super ByteBuf> a) {
}
}

boolean checkTerminated(boolean empty, Subscriber<? super ByteBuf> a) {
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super ByteBuf> a) {
final long state = this.state;
if (isCancelled(state)) {
clearAndTerminate(this);
return true;
}

if (done && empty) {
Throwable e = error;
Throwable e = this.error;
if (e != null) {
a.onError(e);
} else {
Expand All @@ -312,7 +316,7 @@ boolean checkTerminated(boolean empty, Subscriber<? super ByteBuf> a) {
@Override
public void onSubscribe(Subscription s) {
final long state = this.state;
if (done || isTerminated(state) || isCancelled(state)) {
if (this.done || isTerminated(state) || isCancelled(state)) {
s.cancel();
} else {
s.request(Long.MAX_VALUE);
Expand Down Expand Up @@ -349,7 +353,7 @@ public void cancel() {
return;
}

if (outputFused) {
if (this.outputFused) {
return;
}

Expand Down Expand Up @@ -408,18 +412,18 @@ void clearUnsafely() {

@Override
public int size() {
return priorityQueue.size() + queue.size();
return this.priorityQueue.size() + this.queue.size();
}

@Override
public boolean isEmpty() {
return priorityQueue.isEmpty() && queue.isEmpty();
return this.priorityQueue.isEmpty() && this.queue.isEmpty();
}

@Override
public int requestFusion(int requestedMode) {
if ((requestedMode & Fuseable.ASYNC) != 0) {
outputFused = true;
this.outputFused = true;
return Fuseable.ASYNC;
}
return Fuseable.NONE;
Expand Down Expand Up @@ -447,7 +451,7 @@ public boolean isTerminated() {
public Throwable getError() {
final long state = this.state;
if (isTerminated(state) || done) {
return error;
return this.error;
} else {
return null;
}
Expand All @@ -460,7 +464,7 @@ public long downstreamCount() {

@Override
public boolean hasDownstreams() {
return (state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE && actual != null;
return (this.state & FLAG_SUBSCRIBED_ONCE) == FLAG_SUBSCRIBED_ONCE && this.actual != null;
}

static void release(ByteBuf byteBuf) {
Expand Down Expand Up @@ -551,10 +555,11 @@ static long wipRemoveMissing(UnboundedProcessor instance, long previousState) {
}

static void clearAndTerminate(UnboundedProcessor instance) {
final boolean outputFused = instance.outputFused;
for (; ; ) {
long state = instance.state;

if (instance.outputFused) {
if (outputFused) {
instance.clearSafely();
} else {
instance.clearUnsafely();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,28 @@

package io.rsocket.internal;

import static org.assertj.core.api.Assertions.assertThat;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
import io.rsocket.internal.subscriber.AssertSubscriber;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.Fuseable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.util.RaceTestUtils;

public class UnboundedProcessorTest {
@Test
public void testOnNextBeforeSubscribe_10() {
testOnNextBeforeSubscribeN(10);
}

@Test
public void testOnNextBeforeSubscribe_100() {
testOnNextBeforeSubscribeN(100);
}

@Test
public void testOnNextBeforeSubscribe_10_000() {
testOnNextBeforeSubscribeN(10_000);
}

@Test
public void testOnNextBeforeSubscribe_100_000() {
testOnNextBeforeSubscribeN(100_000);
}

@Test
public void testOnNextBeforeSubscribe_1_000_000() {
testOnNextBeforeSubscribeN(1_000_000);
}

@Test
public void testOnNextBeforeSubscribe_10_000_000() {
testOnNextBeforeSubscribeN(10_000_000);
}

@ParameterizedTest(
name =
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
@ValueSource(ints = {10, 100, 10_000, 100_000, 1_000_000, 10_000_000})
public void testOnNextBeforeSubscribeN(int n) {
UnboundedProcessor processor = new UnboundedProcessor();

Expand All @@ -63,68 +47,86 @@ public void testOnNextBeforeSubscribeN(int n) {

processor.onComplete();

long count = processor.count().block();

Assert.assertEquals(n, count);
}

@Test
public void testOnNextAfterSubscribe_10() throws Exception {
testOnNextAfterSubscribeN(10);
StepVerifier.create(processor.count()).expectNext(Long.valueOf(n)).verifyComplete();
}

@Test
public void testOnNextAfterSubscribe_100() throws Exception {
testOnNextAfterSubscribeN(100);
}

@Test
public void testOnNextAfterSubscribe_1000() throws Exception {
testOnNextAfterSubscribeN(1000);
}

@Test
public void testPrioritizedSending() {
@ParameterizedTest(
name =
"Test that emitting {0} onNext after subscribe and requestN should deliver all the signals")
@ValueSource(ints = {10, 100, 10_000})
public void testOnNextAfterSubscribeN(int n) {
UnboundedProcessor processor = new UnboundedProcessor();
AssertSubscriber<ByteBuf> assertSubscriber = AssertSubscriber.create();

for (int i = 0; i < 1000; i++) {
processor.subscribe(assertSubscriber);

for (int i = 0; i < n; i++) {
processor.onNext(Unpooled.EMPTY_BUFFER);
}

processor.onNextPrioritized(Unpooled.wrappedBuffer("test".getBytes(CharsetUtil.UTF_8)));

ByteBuf byteBuf = processor.next().block();

Assert.assertEquals(byteBuf.toString(CharsetUtil.UTF_8), "test");
assertSubscriber.awaitAndAssertNextValueCount(n);
}

@Test
public void testPrioritizedFused() {
@ParameterizedTest(
name =
"Test that prioritized value sending deliver prioritized signals before the others mode[fusionEnabled={0}]")
@ValueSource(booleans = {true, false})
public void testPrioritizedSending(boolean fusedCase) {
UnboundedProcessor processor = new UnboundedProcessor();

for (int i = 0; i < 1000; i++) {
processor.onNext(Unpooled.EMPTY_BUFFER);
}

processor.onNextPrioritized(Unpooled.wrappedBuffer("test".getBytes(CharsetUtil.UTF_8)));
processor.onNextPrioritized(Unpooled.copiedBuffer("test", CharsetUtil.UTF_8));

ByteBuf byteBuf = processor.poll();

Assert.assertEquals(byteBuf.toString(CharsetUtil.UTF_8), "test");
assertThat(fusedCase ? processor.poll() : processor.next().block())
.isNotNull()
.extracting(bb -> bb.toString(CharsetUtil.UTF_8))
.isEqualTo("test");
}

public void testOnNextAfterSubscribeN(int n) throws Exception {
CountDownLatch latch = new CountDownLatch(n);
UnboundedProcessor processor = new UnboundedProcessor();
processor.log().doOnNext(integer -> latch.countDown()).subscribe();

for (int i = 0; i < n; i++) {
System.out.println("onNexting -> " + i);
processor.onNext(Unpooled.EMPTY_BUFFER);
@ParameterizedTest(
name =
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks; mode[fusionEnabled={0}]")
@ValueSource(booleans = {true, false})
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
final LeaksTrackingByteBufAllocator allocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
for (int i = 0; i < 10000; i++) {
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();

final ByteBuf buffer1 = allocator.buffer(1);
final ByteBuf buffer2 = allocator.buffer(2);

final AssertSubscriber<ByteBuf> assertSubscriber =
new AssertSubscriber<ByteBuf>(0)
.requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);

unboundedProcessor.subscribe(assertSubscriber);

RaceTestUtils.race(
() ->
RaceTestUtils.race(
() ->
RaceTestUtils.race(
() -> {
unboundedProcessor.onNext(buffer1);
unboundedProcessor.onNext(buffer2);
},
unboundedProcessor::dispose,
Schedulers.elastic()),
assertSubscriber::cancel,
Schedulers.elastic()),
() -> {
assertSubscriber.request(1);
assertSubscriber.request(1);
},
Schedulers.elastic());

assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);

allocator.assertHasNoLeaks();
}

processor.drain();

latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -86,6 +87,10 @@ public class AssertSubscriber<T> implements CoreSubscriber<T>, Subscription {
private static final AtomicLongFieldUpdater<AssertSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(AssertSubscriber.class, "requested");

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<AssertSubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(AssertSubscriber.class, "wip");

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<AssertSubscriber, List> NEXT_VALUES =
AtomicReferenceFieldUpdater.newUpdater(AssertSubscriber.class, List.class, "values");
Expand All @@ -104,6 +109,8 @@ public class AssertSubscriber<T> implements CoreSubscriber<T>, Subscription {

volatile long requested;

volatile int wip;

volatile List<T> values = new LinkedList<>();

/** The fusion mode to request. */
Expand Down Expand Up @@ -854,6 +861,10 @@ public void cancel() {
a = S.getAndSet(this, Operators.cancelledSubscription());
if (a != null && a != Operators.cancelledSubscription()) {
a.cancel();

if (establishedFusionMode == Fuseable.ASYNC && WIP.getAndIncrement(this) == 0) {
qs.clear();
}
}
}
}
Expand Down Expand Up @@ -881,10 +892,26 @@ public void onError(Throwable t) {
@Override
public void onNext(T t) {
if (establishedFusionMode == Fuseable.ASYNC) {
if (this.wip != 0 || WIP.getAndIncrement(this) != 0) {
if (isCancelled()) {
qs.clear();
}
return;
}

int m = 0;
for (; ; ) {
if (isCancelled()) {
qs.clear();
break;
}
t = qs.poll();
if (t == null) {
break;
m = WIP.addAndGet(this, -m);
if (m == 0) {
break;
}
continue;
}
valueCount++;
if (valuesStorage) {
Expand Down