43
43
public final class UnboundedProcessor <T > extends FluxProcessor <T , T >
44
44
implements Fuseable .QueueSubscription <T >, Fuseable {
45
45
46
+ final Queue <T > queue ;
47
+ final Queue <T > priorityQueue ;
48
+
49
+ volatile boolean done ;
50
+ Throwable error ;
51
+ // important to not loose the downstream too early and miss discard hook, while
52
+ // having relevant hasDownstreams()
53
+ boolean hasDownstream ;
54
+ volatile CoreSubscriber <? super T > actual ;
55
+
56
+ volatile boolean cancelled ;
57
+
58
+ volatile int once ;
59
+
46
60
@ SuppressWarnings ("rawtypes" )
47
61
static final AtomicIntegerFieldUpdater <UnboundedProcessor > ONCE =
48
62
AtomicIntegerFieldUpdater .newUpdater (UnboundedProcessor .class , "once" );
49
63
64
+ volatile int wip ;
65
+
50
66
@ SuppressWarnings ("rawtypes" )
51
67
static final AtomicIntegerFieldUpdater <UnboundedProcessor > WIP =
52
68
AtomicIntegerFieldUpdater .newUpdater (UnboundedProcessor .class , "wip" );
53
69
70
+ volatile int discardGuard ;
71
+
72
+ @ SuppressWarnings ("rawtypes" )
73
+ static final AtomicIntegerFieldUpdater <UnboundedProcessor > DISCARD_GUARD =
74
+ AtomicIntegerFieldUpdater .newUpdater (UnboundedProcessor .class , "discardGuard" );
75
+
76
+ volatile long requested ;
77
+
54
78
@ SuppressWarnings ("rawtypes" )
55
79
static final AtomicLongFieldUpdater <UnboundedProcessor > REQUESTED =
56
80
AtomicLongFieldUpdater .newUpdater (UnboundedProcessor .class , "requested" );
57
81
58
- final Queue <T > queue ;
59
- final Queue <T > priorityQueue ;
60
- volatile boolean done ;
61
- Throwable error ;
62
- volatile CoreSubscriber <? super T > actual ;
63
- volatile boolean cancelled ;
64
- volatile int once ;
65
- volatile int wip ;
66
- volatile long requested ;
67
- volatile boolean outputFused ;
82
+ boolean outputFused ;
68
83
69
84
public UnboundedProcessor () {
70
85
this .queue = new MpscUnboundedArrayQueue <>(Queues .SMALL_BUFFER_SIZE );
@@ -79,6 +94,7 @@ public int getBufferSize() {
79
94
@ Override
80
95
public Object scanUnsafe (Attr key ) {
81
96
if (Attr .BUFFERED == key ) return queue .size ();
97
+ if (Attr .PREFETCH == key ) return Integer .MAX_VALUE ;
82
98
return super .scanUnsafe (key );
83
99
}
84
100
@@ -143,8 +159,8 @@ void drainFused(Subscriber<? super T> a) {
143
159
for (; ; ) {
144
160
145
161
if (cancelled ) {
146
- clear ();
147
- actual = null ;
162
+ this . clear ();
163
+ hasDownstream = false ;
148
164
return ;
149
165
}
150
166
@@ -153,7 +169,7 @@ void drainFused(Subscriber<? super T> a) {
153
169
a .onNext (null );
154
170
155
171
if (d ) {
156
- actual = null ;
172
+ hasDownstream = false ;
157
173
158
174
Throwable ex = error ;
159
175
if (ex != null ) {
@@ -173,6 +189,9 @@ void drainFused(Subscriber<? super T> a) {
173
189
174
190
public void drain () {
175
191
if (WIP .getAndIncrement (this ) != 0 ) {
192
+ if (cancelled ) {
193
+ this .clear ();
194
+ }
176
195
return ;
177
196
}
178
197
@@ -199,13 +218,13 @@ public void drain() {
199
218
200
219
boolean checkTerminated (boolean d , boolean empty , Subscriber <? super T > a ) {
201
220
if (cancelled ) {
202
- clear ();
203
- actual = null ;
221
+ this . clear ();
222
+ hasDownstream = false ;
204
223
return true ;
205
224
}
206
225
if (d && empty ) {
207
226
Throwable e = error ;
208
- actual = null ;
227
+ hasDownstream = false ;
209
228
if (e != null ) {
210
229
a .onError (e );
211
230
} else {
@@ -226,10 +245,6 @@ public void onSubscribe(Subscription s) {
226
245
}
227
246
}
228
247
229
- public long available () {
230
- return requested ;
231
- }
232
-
233
248
@ Override
234
249
public int getPrefetch () {
235
250
return Integer .MAX_VALUE ;
@@ -308,7 +323,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
308
323
actual .onSubscribe (this );
309
324
this .actual = actual ;
310
325
if (cancelled ) {
311
- this .actual = null ;
326
+ this .hasDownstream = false ;
312
327
} else {
313
328
drain ();
314
329
}
@@ -335,8 +350,8 @@ public void cancel() {
335
350
cancelled = true ;
336
351
337
352
if (WIP .getAndIncrement (this ) == 0 ) {
338
- clear ();
339
- actual = null ;
353
+ this . clear ();
354
+ hasDownstream = false ;
340
355
}
341
356
}
342
357
@@ -362,16 +377,29 @@ public boolean isEmpty() {
362
377
363
378
@ Override
364
379
public void clear () {
365
- while (!queue .isEmpty ()) {
366
- T t = queue .poll ();
367
- if (t != null ) {
368
- release (t );
369
- }
380
+ if (DISCARD_GUARD .getAndIncrement (this ) != 0 ) {
381
+ return ;
370
382
}
371
- while (!priorityQueue .isEmpty ()) {
372
- T t = priorityQueue .poll ();
373
- if (t != null ) {
374
- release (t );
383
+
384
+ int missed = 1 ;
385
+
386
+ for (; ; ) {
387
+ while (!queue .isEmpty ()) {
388
+ T t = queue .poll ();
389
+ if (t != null ) {
390
+ release (t );
391
+ }
392
+ }
393
+ while (!priorityQueue .isEmpty ()) {
394
+ T t = priorityQueue .poll ();
395
+ if (t != null ) {
396
+ release (t );
397
+ }
398
+ }
399
+
400
+ missed = DISCARD_GUARD .addAndGet (this , -missed );
401
+ if (missed == 0 ) {
402
+ break ;
375
403
}
376
404
}
377
405
}
@@ -413,14 +441,18 @@ public long downstreamCount() {
413
441
414
442
@ Override
415
443
public boolean hasDownstreams () {
416
- return actual != null ;
444
+ return hasDownstream ;
417
445
}
418
446
419
447
void release (T t ) {
420
448
if (t instanceof ReferenceCounted ) {
421
449
ReferenceCounted refCounted = (ReferenceCounted ) t ;
422
450
if (refCounted .refCnt () > 0 ) {
423
- refCounted .release ();
451
+ try {
452
+ refCounted .release ();
453
+ } catch (Throwable ex ) {
454
+ // no ops
455
+ }
424
456
}
425
457
}
426
458
}
0 commit comments