File tree Expand file tree Collapse file tree 2 files changed +42
-11
lines changed
main/java/io/rsocket/internal
test/java/io/rsocket/internal/subscriber Expand file tree Collapse file tree 2 files changed +42
-11
lines changed Original file line number Diff line number Diff line change 20
20
import io .rsocket .internal .jctools .queues .MpscUnboundedArrayQueue ;
21
21
import java .util .Objects ;
22
22
import java .util .Queue ;
23
+ import java .util .concurrent .CancellationException ;
23
24
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
24
25
import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
25
26
import org .reactivestreams .Subscriber ;
@@ -162,7 +163,6 @@ void drainFused(Subscriber<? super T> a) {
162
163
for (; ; ) {
163
164
164
165
if (cancelled ) {
165
- this .clear ();
166
166
hasDownstream = false ;
167
167
return ;
168
168
}
@@ -176,6 +176,7 @@ void drainFused(Subscriber<? super T> a) {
176
176
177
177
Throwable ex = error ;
178
178
if (ex != null ) {
179
+ System .out .println ("Send Error" );
179
180
a .onError (ex );
180
181
} else {
181
182
a .onComplete ();
@@ -352,12 +353,10 @@ public void cancel() {
352
353
}
353
354
cancelled = true ;
354
355
355
- if (outputFused ) {
356
- return ;
357
- }
358
-
359
356
if (WIP .getAndIncrement (this ) == 0 ) {
360
- this .clear ();
357
+ if (!outputFused ) {
358
+ this .clear ();
359
+ }
361
360
hasDownstream = false ;
362
361
}
363
362
}
@@ -422,11 +421,43 @@ public int requestFusion(int requestedMode) {
422
421
423
422
@ Override
424
423
public void dispose () {
425
- try {
426
- super .dispose ();
427
- } catch (Throwable ignored ) {
424
+ if (cancelled ) {
425
+ return ;
426
+ }
427
+
428
+ error = new CancellationException ("Disposed" );
429
+ done = true ;
430
+
431
+ boolean once = true ;
432
+ if (WIP .getAndIncrement (this ) == 0 ) {
433
+ cancelled = true ;
434
+ int m = 1 ;
435
+ for (; ; ) {
436
+ final CoreSubscriber <? super T > a = this .actual ;
437
+
438
+ if (!outputFused ) {
439
+ clear ();
440
+ }
441
+
442
+ if (a != null && once ) {
443
+ try {
444
+ a .onError (error );
445
+ } catch (Throwable ignored ) {
446
+ }
447
+ }
448
+
449
+ cancelled = true ;
450
+ once = false ;
451
+
452
+ int wip = this .wip ;
453
+ if (wip == m ) {
454
+ break ;
455
+ }
456
+ m = wip ;
457
+ }
458
+
459
+ hasDownstream = false ;
428
460
}
429
- cancel ();
430
461
}
431
462
432
463
@ Override
Original file line number Diff line number Diff line change @@ -933,7 +933,7 @@ void drain() {
933
933
}
934
934
935
935
T t ;
936
- int m = 0 ;
936
+ int m = 1 ;
937
937
for (; ; ) {
938
938
if (isCancelled ()) {
939
939
qs .clear ();
You can’t perform that action at this time.
0 commit comments