Skip to content

Commit f44311f

Browse files
RxRingBuffer with Inline Release
Can release when ... - terminal event emitted - unsubscribe/release occurs and queue is empty - unsubscribe/release has occurred and poll/peek occurs Can result in a queue not being put back in the pool if an early unsubscribe occurs from a different thread and causes the poll/peek to not trigger the release. In this case GC will reclaim the pool so it still functions, it just misses the pooling optimization. There is a commented out test of using finalize(). It works as a safety net for the edge case, but at the cost of increased GC time.
1 parent 4a08644 commit f44311f

File tree

1 file changed

+62
-10
lines changed

1 file changed

+62
-10
lines changed

src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,9 @@ protected SpmcArrayQueue<Object> createObject() {
293293
}
294294

295295
};
296-
296+
297+
private volatile boolean released = false;
298+
297299
private RxRingBuffer(Queue<Object> queue, int size) {
298300
this.queue = queue;
299301
this.pool = null;
@@ -307,7 +309,19 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
307309
}
308310

309311
public void release() {
310-
if (pool != null) {
312+
released = true;
313+
// if the queue was not used it will not be null and will not have already returned
314+
// itself to the pool so we want to reclaim immediately
315+
if(queue != null && queue.isEmpty()) {
316+
returnQueueToPool();
317+
}
318+
}
319+
320+
/**
321+
* We only release once a terminal event is emitted.
322+
*/
323+
private void returnQueueToPool() {
324+
if (pool != null && queue != null) {
311325
Queue<Object> q = queue;
312326
q.clear();
313327
queue = null;
@@ -331,7 +345,8 @@ public void unsubscribe() {
331345
* if more onNext are sent than have been requested
332346
*/
333347
public void onNext(Object o) throws MissingBackpressureException {
334-
if (queue == null) {
348+
if (released) {
349+
// System.out.println("onNext: " + o);
335350
throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable.");
336351
}
337352
if (!queue.offer(on.next(o))) {
@@ -362,21 +377,33 @@ public int capacity() {
362377
}
363378

364379
public int count() {
365-
if (queue == null) {
380+
if (released) {
381+
return 0;
382+
}
383+
try {
384+
return queue.size();
385+
} catch (NullPointerException npe) {
386+
// if a race occurs between the if check and it actually being released we'll just default to 0
366387
return 0;
367388
}
368-
return queue.size();
369389
}
370390

371391
public boolean isEmpty() {
372-
if (queue == null) {
392+
if (released) {
393+
return true;
394+
}
395+
try {
396+
return queue.isEmpty();
397+
} catch (NullPointerException npe) {
398+
// if a race occurs between the if check and it actually being released we'll just default to 0
373399
return true;
374400
}
375-
return queue.isEmpty();
376401
}
377402

378403
public Object poll() {
379-
if (queue == null) {
404+
if (released) {
405+
// return to pool if not already done
406+
returnQueueToPool();
380407
// we are unsubscribed and have released the undelrying queue
381408
return null;
382409
}
@@ -394,16 +421,25 @@ public Object poll() {
394421
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
395422
* is currently the way it is.
396423
*/
424+
boolean shouldRelease = false;
397425
if (o == null && terminalState != null && queue.isEmpty()) {
398426
o = terminalState;
399427
// once emitted we clear so a poll loop will finish
400428
terminalState = null;
429+
shouldRelease = true;
430+
}
431+
if (shouldRelease) {
432+
// we emitted a terminal event so release resources
433+
release();
434+
returnQueueToPool();
401435
}
402436
return o;
403437
}
404438

405439
public Object peek() {
406-
if (queue == null) {
440+
if (released) {
441+
// return to pool if not already done
442+
returnQueueToPool();
407443
// we are unsubscribed and have released the undelrying queue
408444
return null;
409445
}
@@ -438,7 +474,23 @@ public Throwable asError(Object o) {
438474

439475
@Override
440476
public boolean isUnsubscribed() {
441-
return queue == null;
477+
return released || queue == null;
442478
}
479+
480+
481+
/**
482+
* Experimenting with finalize. This is an optional safety net, but it comes at a high cost.
483+
* In a 1 minute test with Flight Recorder it increases GC time from 4ms to 28ms.
484+
*/
485+
// @Override
486+
// protected void finalize() throws Throwable {
487+
// try {
488+
// if(queue != null) {
489+
// returnQueueToPool();
490+
// }
491+
// } finally {
492+
// super.finalize();
493+
// }
494+
// }
443495

444496
}

0 commit comments

Comments
 (0)