Skip to content

Commit 645c1f6

Browse files
authored
fixes deadlock on multiconsumer clear/poll in UnboundedProcessor (#990)
1 parent 0601f88 commit 645c1f6

File tree

3 files changed

+263
-110
lines changed

3 files changed

+263
-110
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
2121
import java.util.Objects;
2222
import java.util.Queue;
23+
import java.util.concurrent.CancellationException;
2324
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2425
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2526
import org.reactivestreams.Subscriber;
@@ -55,6 +56,8 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
5556

5657
volatile boolean cancelled;
5758

59+
volatile boolean terminated;
60+
5861
volatile int once;
5962

6063
@SuppressWarnings("rawtypes")
@@ -124,6 +127,9 @@ void drainRegular(Subscriber<? super T> a) {
124127
}
125128

126129
if (checkTerminated(d, empty, a)) {
130+
if (!empty) {
131+
release(t);
132+
}
127133
return;
128134
}
129135

@@ -159,7 +165,9 @@ void drainFused(Subscriber<? super T> a) {
159165
for (; ; ) {
160166

161167
if (cancelled) {
162-
this.clear();
168+
if (terminated) {
169+
this.clear();
170+
}
163171
hasDownstream = false;
164172
return;
165173
}
@@ -189,7 +197,7 @@ void drainFused(Subscriber<? super T> a) {
189197

190198
public void drain() {
191199
if (WIP.getAndIncrement(this) != 0) {
192-
if (cancelled) {
200+
if ((!outputFused && cancelled) || terminated) {
193201
this.clear();
194202
}
195203
return;
@@ -350,7 +358,9 @@ public void cancel() {
350358
cancelled = true;
351359

352360
if (WIP.getAndIncrement(this) == 0) {
353-
this.clear();
361+
if (!outputFused || terminated) {
362+
this.clear();
363+
}
354364
hasDownstream = false;
355365
}
356366
}
@@ -377,24 +387,20 @@ public boolean isEmpty() {
377387

378388
@Override
379389
public void clear() {
390+
terminated = true;
380391
if (DISCARD_GUARD.getAndIncrement(this) != 0) {
381392
return;
382393
}
383394

384395
int missed = 1;
385396

386397
for (; ; ) {
387-
while (!queue.isEmpty()) {
388-
T t = queue.poll();
389-
if (t != null) {
390-
release(t);
391-
}
398+
T t;
399+
while ((t = queue.poll()) != null) {
400+
release(t);
392401
}
393-
while (!priorityQueue.isEmpty()) {
394-
T t = priorityQueue.poll();
395-
if (t != null) {
396-
release(t);
397-
}
402+
while ((t = priorityQueue.poll()) != null) {
403+
release(t);
398404
}
399405

400406
missed = DISCARD_GUARD.addAndGet(this, -missed);
@@ -415,7 +421,43 @@ public int requestFusion(int requestedMode) {
415421

416422
@Override
417423
public void dispose() {
418-
cancel();
424+
if (cancelled) {
425+
return;
426+
}
427+
428+
error = new CancellationException("Disposed");
429+
done = true;
430+
431+
boolean once = true;
432+
if (WIP.getAndIncrement(this) == 0) {
433+
cancelled = true;
434+
int m = 1;
435+
for (; ; ) {
436+
final CoreSubscriber<? super T> a = this.actual;
437+
438+
if (!outputFused || terminated) {
439+
clear();
440+
}
441+
442+
if (a != null && once) {
443+
try {
444+
a.onError(error);
445+
} catch (Throwable ignored) {
446+
}
447+
}
448+
449+
cancelled = true;
450+
once = false;
451+
452+
int wip = this.wip;
453+
if (wip == m) {
454+
break;
455+
}
456+
m = wip;
457+
}
458+
459+
hasDownstream = false;
460+
}
419461
}
420462

421463
@Override
Lines changed: 138 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-present the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,115 +16,173 @@
1616

1717
package io.rsocket.internal;
1818

19-
import io.rsocket.Payload;
20-
import io.rsocket.util.ByteBufPayload;
21-
import io.rsocket.util.EmptyPayload;
22-
import java.util.concurrent.CountDownLatch;
23-
import org.junit.Assert;
24-
import org.junit.Test;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.buffer.Unpooled;
24+
import io.netty.util.CharsetUtil;
25+
import io.netty.util.ReferenceCountUtil;
26+
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
27+
import io.rsocket.internal.subscriber.AssertSubscriber;
28+
import java.time.Duration;
29+
import org.junit.jupiter.api.BeforeAll;
30+
import org.junit.jupiter.api.RepeatedTest;
31+
import org.junit.jupiter.api.Timeout;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
34+
import reactor.core.Fuseable;
35+
import reactor.core.publisher.Hooks;
36+
import reactor.core.publisher.Operators;
37+
import reactor.core.scheduler.Schedulers;
38+
import reactor.test.StepVerifier;
39+
import reactor.test.util.RaceTestUtils;
2540

2641
public class UnboundedProcessorTest {
27-
@Test
28-
public void testOnNextBeforeSubscribe_10() {
29-
testOnNextBeforeSubscribeN(10);
30-
}
31-
32-
@Test
33-
public void testOnNextBeforeSubscribe_100() {
34-
testOnNextBeforeSubscribeN(100);
35-
}
3642

37-
@Test
38-
public void testOnNextBeforeSubscribe_10_000() {
39-
testOnNextBeforeSubscribeN(10_000);
43+
@BeforeAll
44+
public static void setup() {
45+
Hooks.onErrorDropped(__ -> {});
4046
}
4147

42-
@Test
43-
public void testOnNextBeforeSubscribe_100_000() {
44-
testOnNextBeforeSubscribeN(100_000);
45-
}
46-
47-
@Test
48-
public void testOnNextBeforeSubscribe_1_000_000() {
49-
testOnNextBeforeSubscribeN(1_000_000);
50-
}
51-
52-
@Test
53-
public void testOnNextBeforeSubscribe_10_000_000() {
54-
testOnNextBeforeSubscribeN(10_000_000);
48+
public static void teardown() {
49+
Hooks.resetOnErrorDropped();
5550
}
5651

52+
@ParameterizedTest(
53+
name =
54+
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
55+
@ValueSource(ints = {10, 100, 10_000, 100_000, 1_000_000, 10_000_000})
5756
public void testOnNextBeforeSubscribeN(int n) {
58-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
57+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
5958

6059
for (int i = 0; i < n; i++) {
61-
processor.onNext(EmptyPayload.INSTANCE);
60+
processor.onNext(Unpooled.EMPTY_BUFFER);
6261
}
6362

6463
processor.onComplete();
6564

66-
long count = processor.count().block();
67-
68-
Assert.assertEquals(n, count);
69-
}
70-
71-
@Test
72-
public void testOnNextAfterSubscribe_10() throws Exception {
73-
testOnNextAfterSubscribeN(10);
74-
}
75-
76-
@Test
77-
public void testOnNextAfterSubscribe_100() throws Exception {
78-
testOnNextAfterSubscribeN(100);
65+
StepVerifier.create(processor.count()).expectNext(Long.valueOf(n)).verifyComplete();
7966
}
8067

81-
@Test
82-
public void testOnNextAfterSubscribe_1000() throws Exception {
83-
testOnNextAfterSubscribeN(1000);
84-
}
68+
@ParameterizedTest(
69+
name =
70+
"Test that emitting {0} onNext after subscribe and requestN should deliver all the signals")
71+
@ValueSource(ints = {10, 100, 10_000})
72+
public void testOnNextAfterSubscribeN(int n) {
73+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
74+
AssertSubscriber<ByteBuf> assertSubscriber = AssertSubscriber.create();
8575

86-
@Test
87-
public void testPrioritizedSending() {
88-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
76+
processor.subscribe(assertSubscriber);
8977

90-
for (int i = 0; i < 1000; i++) {
91-
processor.onNext(EmptyPayload.INSTANCE);
78+
for (int i = 0; i < n; i++) {
79+
processor.onNext(Unpooled.EMPTY_BUFFER);
9280
}
9381

94-
processor.onNextPrioritized(ByteBufPayload.create("test"));
95-
96-
Payload closestPayload = processor.next().block();
97-
98-
Assert.assertEquals(closestPayload.getDataUtf8(), "test");
82+
assertSubscriber.awaitAndAssertNextValueCount(n);
9983
}
10084

101-
@Test
102-
public void testPrioritizedFused() {
103-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
85+
@ParameterizedTest(
86+
name =
87+
"Test that prioritized value sending deliver prioritized signals before the others mode[fusionEnabled={0}]")
88+
@ValueSource(booleans = {true, false})
89+
public void testPrioritizedSending(boolean fusedCase) {
90+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
10491

10592
for (int i = 0; i < 1000; i++) {
106-
processor.onNext(EmptyPayload.INSTANCE);
93+
processor.onNext(Unpooled.EMPTY_BUFFER);
10794
}
10895

109-
processor.onNextPrioritized(ByteBufPayload.create("test"));
96+
processor.onNextPrioritized(Unpooled.copiedBuffer("test", CharsetUtil.UTF_8));
11097

111-
Payload closestPayload = processor.poll();
112-
113-
Assert.assertEquals(closestPayload.getDataUtf8(), "test");
98+
assertThat(fusedCase ? processor.poll() : processor.next().block())
99+
.isNotNull()
100+
.extracting(bb -> bb.toString(CharsetUtil.UTF_8))
101+
.isEqualTo("test");
114102
}
115103

116-
public void testOnNextAfterSubscribeN(int n) throws Exception {
117-
CountDownLatch latch = new CountDownLatch(n);
118-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
119-
processor.log().doOnNext(integer -> latch.countDown()).subscribe();
120-
121-
for (int i = 0; i < n; i++) {
122-
System.out.println("onNexting -> " + i);
123-
processor.onNext(EmptyPayload.INSTANCE);
104+
@ParameterizedTest(
105+
name =
106+
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks; mode[fusionEnabled={0}]")
107+
@ValueSource(booleans = {true, false})
108+
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
109+
final LeaksTrackingByteBufAllocator allocator =
110+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
111+
for (int i = 0; i < 100000; i++) {
112+
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();
113+
114+
final ByteBuf buffer1 = allocator.buffer(1);
115+
final ByteBuf buffer2 = allocator.buffer(2);
116+
117+
final AssertSubscriber<ByteBuf> assertSubscriber =
118+
new AssertSubscriber<ByteBuf>(0)
119+
.requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
120+
121+
unboundedProcessor.subscribe(assertSubscriber);
122+
123+
RaceTestUtils.race(
124+
() ->
125+
RaceTestUtils.race(
126+
() ->
127+
RaceTestUtils.race(
128+
() -> {
129+
unboundedProcessor.onNext(buffer1);
130+
unboundedProcessor.onNext(buffer2);
131+
},
132+
unboundedProcessor::dispose,
133+
Schedulers.elastic()),
134+
assertSubscriber::cancel,
135+
Schedulers.elastic()),
136+
() -> {
137+
assertSubscriber.request(1);
138+
assertSubscriber.request(1);
139+
},
140+
Schedulers.elastic());
141+
142+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
143+
144+
allocator.assertHasNoLeaks();
124145
}
146+
}
125147

126-
processor.drain();
127-
128-
latch.await();
148+
@RepeatedTest(
149+
name =
150+
"Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks",
151+
value = 100000)
152+
@Timeout(60)
153+
public void ensuresAsyncFusionAndDisposureHasNoDeadlock() {
154+
// TODO: enable leaks tracking
155+
// final LeaksTrackingByteBufAllocator allocator =
156+
// LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
157+
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();
158+
159+
// final ByteBuf buffer1 = allocator.buffer(1);
160+
// final ByteBuf buffer2 = allocator.buffer(2);
161+
162+
final AssertSubscriber<ByteBuf> assertSubscriber =
163+
new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease));
164+
165+
unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber);
166+
167+
RaceTestUtils.race(
168+
() -> {
169+
// unboundedProcessor.onNext(buffer1);
170+
// unboundedProcessor.onNext(buffer2);
171+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
172+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
173+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
174+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
175+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
176+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
177+
unboundedProcessor.dispose();
178+
},
179+
unboundedProcessor::dispose);
180+
181+
assertSubscriber
182+
.await(Duration.ofSeconds(50))
183+
.values()
184+
.forEach(ReferenceCountUtil::safeRelease);
185+
186+
// allocator.assertHasNoLeaks();
129187
}
130188
}

0 commit comments

Comments
 (0)