Skip to content

Commit 83c41e9

Browse files
committed
more fixes
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent af7adf8 commit 83c41e9

File tree

2 files changed

+15
-17
lines changed

2 files changed

+15
-17
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
5656

5757
volatile boolean cancelled;
5858

59+
volatile boolean terminated;
60+
5961
volatile int once;
6062

6163
@SuppressWarnings("rawtypes")
@@ -163,6 +165,9 @@ void drainFused(Subscriber<? super T> a) {
163165
for (; ; ) {
164166

165167
if (cancelled) {
168+
if (terminated) {
169+
this.clear();
170+
}
166171
hasDownstream = false;
167172
return;
168173
}
@@ -192,7 +197,7 @@ void drainFused(Subscriber<? super T> a) {
192197

193198
public void drain() {
194199
if (WIP.getAndIncrement(this) != 0) {
195-
if (cancelled) {
200+
if ((!outputFused && cancelled) || terminated) {
196201
this.clear();
197202
}
198203
return;
@@ -353,7 +358,7 @@ public void cancel() {
353358
cancelled = true;
354359

355360
if (WIP.getAndIncrement(this) == 0) {
356-
if (!outputFused) {
361+
if (!outputFused || terminated) {
357362
this.clear();
358363
}
359364
hasDownstream = false;
@@ -382,24 +387,20 @@ public boolean isEmpty() {
382387

383388
@Override
384389
public void clear() {
390+
terminated = true;
385391
if (DISCARD_GUARD.getAndIncrement(this) != 0) {
386392
return;
387393
}
388394

389395
int missed = 1;
390396

391397
for (; ; ) {
392-
while (!queue.isEmpty()) {
393-
T t = queue.poll();
394-
if (t != null) {
395-
release(t);
396-
}
398+
T t;
399+
while ((t = queue.poll()) != null) {
400+
release(t);
397401
}
398-
while (!priorityQueue.isEmpty()) {
399-
T t = priorityQueue.poll();
400-
if (t != null) {
401-
release(t);
402-
}
402+
while ((t = priorityQueue.poll()) != null) {
403+
release(t);
403404
}
404405

405406
missed = DISCARD_GUARD.addAndGet(this, -missed);
@@ -434,16 +435,14 @@ public void dispose() {
434435
for (; ; ) {
435436
final CoreSubscriber<? super T> a = this.actual;
436437

437-
if (!outputFused) {
438+
if (!outputFused || terminated) {
438439
clear();
439440
}
440441

441442
if (a != null && once) {
442443
try {
443-
System.out.println("sending error");
444444
a.onError(error);
445445
} catch (Throwable ignored) {
446-
System.out.println("bubbled");
447446
ignored.printStackTrace();
448447
}
449448
}

rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ public void ensuresAsyncFusionAndDisposureHasNoDeadlock() {
173173
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
174174
unboundedProcessor.dispose();
175175
},
176-
unboundedProcessor::dispose,
177-
Schedulers.elastic());
176+
unboundedProcessor::dispose);
178177

179178
assertSubscriber
180179
.await(Duration.ofSeconds(50))

0 commit comments

Comments
 (0)