Skip to content

Commit eb30398

Browse files
committed
improves dispose impl
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0265b79 commit eb30398

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
2121
import java.util.Objects;
2222
import java.util.Queue;
23+
import java.util.concurrent.CancellationException;
2324
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2425
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2526
import org.reactivestreams.Subscriber;
@@ -162,7 +163,6 @@ void drainFused(Subscriber<? super T> a) {
162163
for (; ; ) {
163164

164165
if (cancelled) {
165-
this.clear();
166166
hasDownstream = false;
167167
return;
168168
}
@@ -176,6 +176,7 @@ void drainFused(Subscriber<? super T> a) {
176176

177177
Throwable ex = error;
178178
if (ex != null) {
179+
System.out.println("Send Error");
179180
a.onError(ex);
180181
} else {
181182
a.onComplete();
@@ -352,12 +353,10 @@ public void cancel() {
352353
}
353354
cancelled = true;
354355

355-
if (outputFused) {
356-
return;
357-
}
358-
359356
if (WIP.getAndIncrement(this) == 0) {
360-
this.clear();
357+
if (!outputFused) {
358+
this.clear();
359+
}
361360
hasDownstream = false;
362361
}
363362
}
@@ -422,11 +421,43 @@ public int requestFusion(int requestedMode) {
422421

423422
@Override
424423
public void dispose() {
425-
try {
426-
super.dispose();
427-
} catch (Throwable ignored) {
424+
if (cancelled) {
425+
return;
426+
}
427+
428+
error = new CancellationException("Disposed");
429+
done = true;
430+
431+
boolean once = true;
432+
if (WIP.getAndIncrement(this) == 0) {
433+
cancelled = true;
434+
int m = 1;
435+
for (; ; ) {
436+
final CoreSubscriber<? super T> a = this.actual;
437+
438+
if (!outputFused) {
439+
clear();
440+
}
441+
442+
if (a != null && once) {
443+
try {
444+
a.onError(error);
445+
} catch (Throwable ignored) {
446+
}
447+
}
448+
449+
cancelled = true;
450+
once = false;
451+
452+
int wip = this.wip;
453+
if (wip == m) {
454+
break;
455+
}
456+
m = wip;
457+
}
458+
459+
hasDownstream = false;
428460
}
429-
cancel();
430461
}
431462

432463
@Override

rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ void drain() {
933933
}
934934

935935
T t;
936-
int m = 0;
936+
int m = 1;
937937
for (; ; ) {
938938
if (isCancelled()) {
939939
qs.clear();

0 commit comments

Comments
 (0)