Skip to content

Commit 2c2051f

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Restore merge changes: not related to the bug
1 parent c1d4ca7 commit 2c2051f

File tree

1 file changed

+16
-23
lines changed

1 file changed

+16
-23
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ private void handleNewSource(Observable<? extends T> t) {
194194
}
195195
MergeProducer<T> producerIfNeeded = null;
196196
// if we have received a request then we need to respect it, otherwise we fast-path
197-
if (mergeProducer.requested() != Long.MAX_VALUE) {
197+
if (mergeProducer.requested != Long.MAX_VALUE) {
198198
/**
199199
* <pre> {@code
200200
* With this optimization:
@@ -237,7 +237,7 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext
237237
* } </pre>
238238
*
239239
*/
240-
if (mergeProducer.requested() == Long.MAX_VALUE) {
240+
if (mergeProducer.requested == Long.MAX_VALUE) {
241241
handleScalarSynchronousObservableWithoutRequestLimits(t);
242242
} else {
243243
handleScalarSynchronousObservableWithRequestLimits(t);
@@ -274,11 +274,11 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
274274
boolean moreToDrain;
275275
boolean isReturn = false;
276276
try {
277-
long r = mergeProducer.requested();
277+
long r = mergeProducer.requested;
278278
if (r > 0) {
279279
emitted = true;
280280
actual.onNext(t.get());
281-
mergeProducer.getAndAdd(-1);
281+
MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
282282
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
283283
isReturn = true;
284284
}
@@ -376,7 +376,7 @@ private void drainChildrenQueues() {
376376
private int drainScalarValueQueue() {
377377
RxRingBuffer svq = scalarValueQueue;
378378
if (svq != null) {
379-
long r = mergeProducer.requested();
379+
long r = mergeProducer.requested;
380380
int emittedWhileDraining = 0;
381381
if (r < 0) {
382382
// drain it all
@@ -398,7 +398,7 @@ private int drainScalarValueQueue() {
398398
}
399399
}
400400
// decrement the number we emitted from outstanding requests
401-
mergeProducer.getAndAdd(-emittedWhileDraining);
401+
MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
402402
}
403403
return emittedWhileDraining;
404404
}
@@ -410,7 +410,7 @@ private int drainScalarValueQueue() {
410410
@Override
411411
public Boolean call(InnerSubscriber<T> s) {
412412
if (s.q != null) {
413-
long r = mergeProducer.requested();
413+
long r = mergeProducer.requested;
414414
int emitted = s.drainQueue();
415415
if (emitted > 0) {
416416
s.requestMore(emitted);
@@ -533,26 +533,19 @@ public MergeProducer(MergeSubscriber<T> ms) {
533533
this.ms = ms;
534534
}
535535

536-
private volatile long rq = 0;
536+
private volatile long requested = 0;
537537
@SuppressWarnings("rawtypes")
538-
static final AtomicLongFieldUpdater<MergeProducer> RQ = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "rq");
538+
static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
539539

540-
public long requested() {
541-
return rq;
542-
}
543-
public long getAndAdd(long n) {
544-
return RQ.getAndAdd(this, n);
545-
}
546-
547540
@Override
548541
public void request(long n) {
549-
if (rq == Long.MAX_VALUE) {
542+
if (requested == Long.MAX_VALUE) {
550543
return;
551544
}
552545
if (n == Long.MAX_VALUE) {
553-
rq = Long.MAX_VALUE;
546+
requested = Long.MAX_VALUE;
554547
} else {
555-
BackpressureUtils.getAndAddRequest(RQ, this, n);
548+
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
556549
if (ms.drainQueuesIfNeeded()) {
557550
boolean sendComplete = false;
558551
synchronized (ms) {
@@ -675,7 +668,7 @@ private void emit(T t, boolean complete) {
675668
} else {
676669
// this needs to check q.count() as draining above may not have drained the full queue
677670
// perf tests show this to be okay, though different queue implementations could perform poorly with this
678-
if (producer.requested() > 0 && q.count() == 0) {
671+
if (producer.requested > 0 && q.count() == 0) {
679672
if (complete) {
680673
parentSubscriber.completeInner(this);
681674
} else {
@@ -686,7 +679,7 @@ private void emit(T t, boolean complete) {
686679
onError(OnErrorThrowable.addValueAsLastCause(e, t));
687680
}
688681
emitted++;
689-
producer.getAndAdd(-1);
682+
MergeProducer.REQUESTED.decrementAndGet(producer);
690683
}
691684
} else {
692685
// no requests available, so enqueue it
@@ -735,7 +728,7 @@ private void enqueue(T t, boolean complete) {
735728
private int drainRequested() {
736729
int emitted = 0;
737730
// drain what was requested
738-
long toEmit = producer.requested();
731+
long toEmit = producer.requested;
739732
Object o;
740733
for (int i = 0; i < toEmit; i++) {
741734
o = q.poll();
@@ -757,7 +750,7 @@ private int drainRequested() {
757750
}
758751

759752
// decrement the number we emitted from outstanding requests
760-
producer.getAndAdd(-emitted);
753+
MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
761754
return emitted;
762755
}
763756

0 commit comments

Comments
 (0)