Skip to content

Commit dda25e6

Browse files
Merge pull request #2553 from akarnokd/RxRingBufferSynchronized
RxRingBuffer with synchronization
2 parents f76861b + 84d38bf commit dda25e6

File tree

3 files changed

+54
-52
lines changed

3 files changed

+54
-52
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
308308
private RxRingBuffer getOrCreateScalarValueQueue() {
309309
RxRingBuffer svq = scalarValueQueue;
310310
if (svq == null) {
311-
svq = RxRingBuffer.getSpmcInstance();
311+
svq = RxRingBuffer.getSpscInstance();
312312
scalarValueQueue = svq;
313313
}
314314
return svq;
@@ -581,7 +581,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
581581
@SuppressWarnings("rawtypes")
582582
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
583583

584-
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
584+
private final RxRingBuffer q = RxRingBuffer.getSpscInstance();
585585

586586
public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
587587
this.parentSubscriber = parent;

src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ public class RxRingBuffer implements Subscription {
3333

3434
public static RxRingBuffer getSpscInstance() {
3535
if (UnsafeAccess.isUnsafeAvailable()) {
36-
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
37-
return new RxRingBuffer(SPMC_POOL, SIZE);
36+
return new RxRingBuffer(SPSC_POOL, SIZE);
3837
} else {
3938
return new RxRingBuffer();
4039
}
@@ -306,12 +305,13 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
306305
this.size = size;
307306
}
308307

309-
public void release() {
310-
if (pool != null) {
311-
Queue<Object> q = queue;
308+
public synchronized void release() {
309+
Queue<Object> q = queue;
310+
ObjectPool<Queue<Object>> p = pool;
311+
if (p != null && q != null) {
312312
q.clear();
313313
queue = null;
314-
pool.returnObject(q);
314+
p.returnObject(q);
315315
}
316316
}
317317

@@ -331,10 +331,21 @@ public void unsubscribe() {
331331
* if more onNext are sent than have been requested
332332
*/
333333
public void onNext(Object o) throws MissingBackpressureException {
334-
if (queue == null) {
334+
boolean iae = false;
335+
boolean mbe = false;
336+
synchronized (this) {
337+
Queue<Object> q = queue;
338+
if (q != null) {
339+
mbe = !q.offer(on.next(o));
340+
} else {
341+
iae = true;
342+
}
343+
}
344+
345+
if (iae) {
335346
throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable.");
336347
}
337-
if (!queue.offer(on.next(o))) {
348+
if (mbe) {
338349
throw new MissingBackpressureException();
339350
}
340351
}
@@ -362,55 +373,54 @@ public int capacity() {
362373
}
363374

364375
public int count() {
365-
if (queue == null) {
376+
Queue<Object> q = queue;
377+
if (q == null) {
366378
return 0;
367379
}
368-
return queue.size();
380+
return q.size();
369381
}
370382

371383
public boolean isEmpty() {
372-
if (queue == null) {
384+
Queue<Object> q = queue;
385+
if (q == null) {
373386
return true;
374387
}
375-
return queue.isEmpty();
388+
return q.isEmpty();
376389
}
377390

378391
public Object poll() {
379-
if (queue == null) {
380-
// we are unsubscribed and have released the undelrying queue
381-
return null;
382-
}
383392
Object o;
384-
o = queue.poll();
385-
/*
386-
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
387-
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
388-
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
389-
*
390-
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
391-
* or needing to enqueue terminalState.
392-
*
393-
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
394-
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
395-
* is currently the way it is.
396-
*/
397-
if (o == null && terminalState != null && queue.isEmpty()) {
398-
o = terminalState;
399-
// once emitted we clear so a poll loop will finish
400-
terminalState = null;
393+
synchronized (this) {
394+
Queue<Object> q = queue;
395+
if (q == null) {
396+
// we are unsubscribed and have released the underlying queue
397+
return null;
398+
}
399+
o = q.poll();
400+
401+
Object ts = terminalState;
402+
if (o == null && ts != null && q.peek() == null) {
403+
o = ts;
404+
// once emitted we clear so a poll loop will finish
405+
terminalState = null;
406+
}
401407
}
402408
return o;
403409
}
404410

405411
public Object peek() {
406-
if (queue == null) {
407-
// we are unsubscribed and have released the undelrying queue
408-
return null;
409-
}
410412
Object o;
411-
o = queue.peek();
412-
if (o == null && terminalState != null && queue.isEmpty()) {
413-
o = terminalState;
413+
synchronized (this) {
414+
Queue<Object> q = queue;
415+
if (q == null) {
416+
// we are unsubscribed and have released the underlying queue
417+
return null;
418+
}
419+
o = q.peek();
420+
Object ts = terminalState;
421+
if (o == null && ts != null && q.peek() == null) {
422+
o = ts;
423+
}
414424
}
415425
return o;
416426
}

src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,11 @@ public SpscArrayQueue(final int capacity) {
118118
*/
119119
@Override
120120
public boolean offer(final E e) {
121-
if (null == e) {
122-
throw new NullPointerException("Null is not a valid element");
123-
}
124121
// local load of field to avoid repeated loads after volatile reads
125122
final E[] lElementBuffer = buffer;
126123
final long offset = calcElementOffset(producerIndex);
127-
if (producerIndex >= producerLookAhead) {
128-
if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
129-
producerLookAhead = producerIndex + lookAheadStep;
130-
}
131-
else if (null != lvElement(lElementBuffer, offset)){
132-
return false;
133-
}
124+
if (null != lvElement(lElementBuffer, offset)){
125+
return false;
134126
}
135127
producerIndex++; // do increment here so the ordered store give both a barrier
136128
soElement(lElementBuffer, offset, e);// StoreStore

0 commit comments

Comments
 (0)