Skip to content

Commit 8c55fe7

Browse files
committed
fixes AssertSubscriber
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 813a78b commit 8c55fe7

File tree

2 files changed

+67
-30
lines changed

2 files changed

+67
-30
lines changed

rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,29 @@
2525
import io.netty.util.ReferenceCountUtil;
2626
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
2727
import io.rsocket.internal.subscriber.AssertSubscriber;
28+
import org.junit.jupiter.api.BeforeAll;
2829
import org.junit.jupiter.api.RepeatedTest;
2930
import org.junit.jupiter.api.Timeout;
3031
import org.junit.jupiter.params.ParameterizedTest;
3132
import org.junit.jupiter.params.provider.ValueSource;
3233
import reactor.core.Fuseable;
34+
import reactor.core.publisher.Hooks;
3335
import reactor.core.publisher.Operators;
3436
import reactor.core.scheduler.Schedulers;
3537
import reactor.test.StepVerifier;
3638
import reactor.test.util.RaceTestUtils;
3739

3840
public class UnboundedProcessorTest {
3941

42+
@BeforeAll
43+
public static void setup() {
44+
Hooks.onErrorDropped(__ -> {});
45+
}
46+
47+
public static void teardown() {
48+
Hooks.resetOnErrorDropped();
49+
}
50+
4051
@ParameterizedTest(
4152
name =
4253
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
@@ -96,7 +107,7 @@ public void testPrioritizedSending(boolean fusedCase) {
96107
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
97108
final LeaksTrackingByteBufAllocator allocator =
98109
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
99-
for (int i = 0; i < 10000000; i++) {
110+
for (int i = 0; i < 100000; i++) {
100111
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();
101112

102113
final ByteBuf buffer1 = allocator.buffer(1);
@@ -135,7 +146,7 @@ public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnab
135146

136147
@RepeatedTest(
137148
name =
138-
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks",
149+
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks in async backFused mode",
139150
value = 100000)
140151
@Timeout(10)
141152
public void ensureUnboundedProcessorDisposesQueueProperlyAsyncMode() {

rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class AssertSubscriber<T> implements CoreSubscriber<T>, Subscription {
105105

106106
private final CountDownLatch cdl = new CountDownLatch(1);
107107

108+
volatile boolean done;
109+
108110
volatile Subscription s;
109111

110112
volatile long requested;
@@ -879,53 +881,77 @@ public final boolean isTerminated() {
879881

880882
@Override
881883
public void onComplete() {
884+
done = true;
882885
completionCount++;
886+
887+
if (establishedFusionMode == Fuseable.ASYNC) {
888+
drain();
889+
return;
890+
}
891+
883892
cdl.countDown();
884893
}
885894

886895
@Override
887896
public void onError(Throwable t) {
897+
done = true;
888898
errors.add(t);
899+
900+
if (establishedFusionMode == Fuseable.ASYNC) {
901+
drain();
902+
return;
903+
}
904+
889905
cdl.countDown();
890906
}
891907

892908
@Override
893909
public void onNext(T t) {
894910
if (establishedFusionMode == Fuseable.ASYNC) {
895-
if (this.wip != 0 || WIP.getAndIncrement(this) != 0) {
896-
if (isCancelled()) {
897-
qs.clear();
911+
drain();
912+
} else {
913+
valueCount++;
914+
if (valuesStorage) {
915+
List<T> nextValuesSnapshot;
916+
for (; ; ) {
917+
nextValuesSnapshot = values;
918+
nextValuesSnapshot.add(t);
919+
if (NEXT_VALUES.compareAndSet(this, nextValuesSnapshot, nextValuesSnapshot)) {
920+
break;
921+
}
898922
}
899-
return;
900923
}
924+
}
925+
}
901926

902-
int m = 0;
903-
for (; ; ) {
904-
if (isCancelled()) {
905-
qs.clear();
906-
break;
907-
}
908-
t = qs.poll();
909-
if (t == null) {
910-
m = WIP.addAndGet(this, -m);
911-
if (m == 0) {
912-
break;
913-
}
914-
continue;
927+
void drain() {
928+
if (this.wip != 0 || WIP.getAndIncrement(this) != 0) {
929+
if (isCancelled()) {
930+
qs.clear();
931+
}
932+
return;
933+
}
934+
935+
T t;
936+
int m = 0;
937+
for (; ; ) {
938+
if (isCancelled()) {
939+
qs.clear();
940+
break;
941+
}
942+
boolean done = this.done;
943+
t = qs.poll();
944+
if (t == null) {
945+
if (done) {
946+
cdl.countDown();
947+
return;
915948
}
916-
valueCount++;
917-
if (valuesStorage) {
918-
List<T> nextValuesSnapshot;
919-
for (; ; ) {
920-
nextValuesSnapshot = values;
921-
nextValuesSnapshot.add(t);
922-
if (NEXT_VALUES.compareAndSet(this, nextValuesSnapshot, nextValuesSnapshot)) {
923-
break;
924-
}
925-
}
949+
m = WIP.addAndGet(this, -m);
950+
if (m == 0) {
951+
break;
926952
}
953+
continue;
927954
}
928-
} else {
929955
valueCount++;
930956
if (valuesStorage) {
931957
List<T> nextValuesSnapshot;

0 commit comments

Comments
 (0)