@@ -293,7 +293,9 @@ protected SpmcArrayQueue<Object> createObject() {
293
293
}
294
294
295
295
};
296
-
296
+
297
+ private volatile boolean released = false ;
298
+
297
299
private RxRingBuffer (Queue <Object > queue , int size ) {
298
300
this .queue = queue ;
299
301
this .pool = null ;
@@ -307,7 +309,19 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
307
309
}
308
310
309
311
public void release () {
310
- if (pool != null ) {
312
+ released = true ;
313
+ // if the queue was not used it will not be null and will not have already returned
314
+ // itself to the pool so we want to reclaim immediately
315
+ if (queue != null && queue .isEmpty ()) {
316
+ returnQueueToPool ();
317
+ }
318
+ }
319
+
320
+ /**
321
+ * We only release once a terminal event is emitted.
322
+ */
323
+ private void returnQueueToPool () {
324
+ if (pool != null && queue != null ) {
311
325
Queue <Object > q = queue ;
312
326
q .clear ();
313
327
queue = null ;
@@ -331,7 +345,8 @@ public void unsubscribe() {
331
345
* if more onNext are sent than have been requested
332
346
*/
333
347
public void onNext (Object o ) throws MissingBackpressureException {
334
- if (queue == null ) {
348
+ if (released ) {
349
+ // System.out.println("onNext: " + o);
335
350
throw new IllegalStateException ("This instance has been unsubscribed and the queue is no longer usable." );
336
351
}
337
352
if (!queue .offer (on .next (o ))) {
@@ -362,21 +377,33 @@ public int capacity() {
362
377
}
363
378
364
379
public int count () {
365
- if (queue == null ) {
380
+ if (released ) {
381
+ return 0 ;
382
+ }
383
+ try {
384
+ return queue .size ();
385
+ } catch (NullPointerException npe ) {
386
+ // if a race occurs between the if check and it actually being released we'll just default to 0
366
387
return 0 ;
367
388
}
368
- return queue .size ();
369
389
}
370
390
371
391
public boolean isEmpty () {
372
- if (queue == null ) {
392
+ if (released ) {
393
+ return true ;
394
+ }
395
+ try {
396
+ return queue .isEmpty ();
397
+ } catch (NullPointerException npe ) {
398
+ // if a race occurs between the if check and it actually being released we'll just default to 0
373
399
return true ;
374
400
}
375
- return queue .isEmpty ();
376
401
}
377
402
378
403
public Object poll () {
379
- if (queue == null ) {
404
+ if (released ) {
405
+ // return to pool if not already done
406
+ returnQueueToPool ();
380
407
// we are unsubscribed and have released the undelrying queue
381
408
return null ;
382
409
}
@@ -394,16 +421,25 @@ public Object poll() {
394
421
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
395
422
* is currently the way it is.
396
423
*/
424
+ boolean shouldRelease = false ;
397
425
if (o == null && terminalState != null && queue .isEmpty ()) {
398
426
o = terminalState ;
399
427
// once emitted we clear so a poll loop will finish
400
428
terminalState = null ;
429
+ shouldRelease = true ;
430
+ }
431
+ if (shouldRelease ) {
432
+ // we emitted a terminal event so release resources
433
+ release ();
434
+ returnQueueToPool ();
401
435
}
402
436
return o ;
403
437
}
404
438
405
439
public Object peek () {
406
- if (queue == null ) {
440
+ if (released ) {
441
+ // return to pool if not already done
442
+ returnQueueToPool ();
407
443
// we are unsubscribed and have released the undelrying queue
408
444
return null ;
409
445
}
@@ -438,7 +474,23 @@ public Throwable asError(Object o) {
438
474
439
475
@ Override
440
476
public boolean isUnsubscribed () {
441
- return queue == null ;
477
+ return released || queue == null ;
442
478
}
479
+
480
+
481
+ /**
482
+ * Experimenting with finalize. This is an optional safety net, but it comes at a high cost.
483
+ * In a 1 minute test with Flight Recorder it increases GC time from 4ms to 28ms.
484
+ */
485
+ // @Override
486
+ // protected void finalize() throws Throwable {
487
+ // try {
488
+ // if(queue != null) {
489
+ // returnQueueToPool();
490
+ // }
491
+ // } finally {
492
+ // super.finalize();
493
+ // }
494
+ // }
443
495
444
496
}
0 commit comments