Skip to content

1.x Remove all instances of Atomic*FieldUpdater #3488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/main/java/rx/internal/operators/BlockingOperatorLatest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -59,15 +59,11 @@ public Iterator<T> iterator() {
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
volatile Notification<? extends T> value;
/** Updater for the value field. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");
final AtomicReference<Notification<? extends T>> value = new AtomicReference<Notification<? extends T>>();

@Override
public void onNext(Notification<? extends T> args) {
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
boolean wasntAvailable = value.getAndSet(args) == null;
if (wasntAvailable) {
notify.release();
}
Expand Down Expand Up @@ -103,7 +99,7 @@ public boolean hasNext() {
}

@SuppressWarnings("unchecked")
Notification<? extends T> n = REFERENCE_UPDATER.getAndSet(this, null);
Notification<? extends T> n = value.getAndSet(null);
iNotif = n;
if (iNotif.isOnError()) {
throw Exceptions.propagate(iNotif.getThrowable());
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/rx/internal/operators/BlockingOperatorNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -147,11 +147,7 @@ public void remove() {

private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
@SuppressWarnings("unused")
volatile int waiting;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");
final AtomicInteger waiting = new AtomicInteger();

@Override
public void onCompleted() {
Expand All @@ -166,7 +162,7 @@ public void onError(Throwable e) {
@Override
public void onNext(Notification<? extends T> args) {

if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
if (waiting.getAndSet(0) == 1 || !args.isOnNext()) {
Notification<? extends T> toOffer = args;
while (!buf.offer(toOffer)) {
Notification<? extends T> concurrentItem = buf.poll();
Expand All @@ -185,7 +181,7 @@ public Notification<? extends T> takeNext() throws InterruptedException {
return buf.take();
}
void setWaiting(int value) {
waiting = value;
waiting.set(value);
}
}
}
28 changes: 11 additions & 17 deletions src/main/java/rx/internal/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observer;
import rx.Subscriber;
Expand Down Expand Up @@ -59,15 +59,9 @@ public static <T> BufferUntilSubscriber<T> create() {
}

/** The common state. */
static final class State<T> {
volatile Observer<? super T> observerRef = null;
/** Field updater for observerRef. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");

static final class State<T> extends AtomicReference<Observer<? super T>> {
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
return compareAndSet(expected, next);
}

final Object guard = new Object();
Expand All @@ -92,7 +86,7 @@ public void call(final Subscriber<? super T> s) {
@SuppressWarnings("unchecked")
@Override
public void call() {
state.observerRef = EMPTY_OBSERVER;
state.set(EMPTY_OBSERVER);
}
}));
boolean win = false;
Expand All @@ -107,7 +101,7 @@ public void call() {
while(true) {
Object o;
while ((o = state.buffer.poll()) != null) {
nl.accept(state.observerRef, o);
nl.accept(state.get(), o);
}
synchronized (state.guard) {
if (state.buffer.isEmpty()) {
Expand Down Expand Up @@ -138,7 +132,7 @@ private BufferUntilSubscriber(State<T> state) {
private void emit(Object v) {
synchronized (state.guard) {
state.buffer.add(v);
if (state.observerRef != null && !state.emitting) {
if (state.get() != null && !state.emitting) {
// Have an observer and nobody is emitting,
// should drain the `buffer`
forward = true;
Expand All @@ -148,7 +142,7 @@ private void emit(Object v) {
if (forward) {
Object o;
while ((o = state.buffer.poll()) != null) {
state.nl.accept(state.observerRef, o);
state.nl.accept(state.get(), o);
}
// Because `emit(Object v)` will be called in sequence,
// no event will be put into `buffer` after we drain it.
Expand All @@ -158,7 +152,7 @@ private void emit(Object v) {
@Override
public void onCompleted() {
if (forward) {
state.observerRef.onCompleted();
state.get().onCompleted();
}
else {
emit(state.nl.completed());
Expand All @@ -168,7 +162,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
if (forward) {
state.observerRef.onError(e);
state.get().onError(e);
}
else {
emit(state.nl.error(e));
Expand All @@ -178,7 +172,7 @@ public void onError(Throwable e) {
@Override
public void onNext(T t) {
if (forward) {
state.observerRef.onNext(t);
state.get().onNext(t);
}
else {
emit(state.nl.next(t));
Expand All @@ -188,7 +182,7 @@ public void onNext(T t) {
@Override
public boolean hasObservers() {
synchronized (state.guard) {
return state.observerRef != null;
return state.get() != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.OnSubscribe;
Expand Down Expand Up @@ -90,10 +89,7 @@ final static class MultiSourceProducer<T, R> implements Producer {
private final BitSet completion;
private volatile int completionCount; // does this need to be volatile or is WIP sufficient?

@SuppressWarnings("unused")
private volatile long counter;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<MultiSourceProducer> WIP = AtomicLongFieldUpdater.newUpdater(MultiSourceProducer.class, "counter");
private final AtomicLong counter = new AtomicLong();

@SuppressWarnings("unchecked")
public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
Expand Down Expand Up @@ -139,7 +135,8 @@ public void request(long n) {
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn.
*/
void tick() {
if (WIP.getAndIncrement(this) == 0) {
AtomicLong localCounter = this.counter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for having copy of final reference?

if (localCounter.getAndIncrement() == 0) {
int emitted = 0;
do {
// we only emit if requested > 0
Expand All @@ -155,7 +152,7 @@ void tick() {
}
}
}
} while (WIP.decrementAndGet(this) > 0);
} while (localCounter.decrementAndGet() > 0);
if (emitted > 0) {
for (MultiSourceRequestableSubscriber<T, R> s : subscribers) {
s.requestUpTo(emitted);
Expand Down
35 changes: 14 additions & 21 deletions src/main/java/rx/internal/operators/OperatorConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable;
import rx.Observable.Operator;
Expand Down Expand Up @@ -84,14 +84,10 @@ static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T

volatile ConcatInnerSubscriber<T> currentSubscriber;

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
final AtomicInteger wip = new AtomicInteger();

// accessed by REQUESTED
private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
private final AtomicLong requested = new AtomicLong();
private final ProducerArbiter arbiter;

public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
Expand All @@ -118,10 +114,10 @@ public void onStart() {
private void requestFromChild(long n) {
if (n <=0) return;
// we track 'requested' so we know whether we should subscribe the next or not
long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
long previous = BackpressureUtils.getAndAddRequest(requested, n);
arbiter.request(n);
if (previous == 0) {
if (currentSubscriber == null && wip > 0) {
if (currentSubscriber == null && wip.get() > 0) {
// this means we may be moving from one subscriber to another after having stopped processing
// so need to kick off the subscribe via this request notification
subscribeNext();
Expand All @@ -130,13 +126,13 @@ private void requestFromChild(long n) {
}

private void decrementRequested() {
REQUESTED.decrementAndGet(this);
requested.decrementAndGet();
}

@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP.getAndIncrement(this) == 0) {
if (wip.getAndIncrement() == 0) {
subscribeNext();
}
}
Expand All @@ -150,22 +146,22 @@ public void onError(Throwable e) {
@Override
public void onCompleted() {
queue.add(nl.completed());
if (WIP.getAndIncrement(this) == 0) {
if (wip.getAndIncrement() == 0) {
subscribeNext();
}
}


void completeInner() {
currentSubscriber = null;
if (WIP.decrementAndGet(this) > 0) {
if (wip.decrementAndGet() > 0) {
subscribeNext();
}
request(1);
}

void subscribeNext() {
if (requested > 0) {
if (requested.get() > 0) {
Object o = queue.poll();
if (nl.isCompleted(o)) {
child.onCompleted();
Expand All @@ -189,10 +185,7 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {

private final Subscriber<T> child;
private final ConcatSubscriber<T> parent;
@SuppressWarnings("unused")
private volatile int once = 0;
@SuppressWarnings("rawtypes")
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
private final AtomicInteger once = new AtomicInteger();
private final ProducerArbiter arbiter;

public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
Expand All @@ -210,15 +203,15 @@ public void onNext(T t) {

@Override
public void onError(Throwable e) {
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(0, 1)) {
// terminal error through parent so everything gets cleaned up, including this inner
parent.onError(e);
}
}

@Override
public void onCompleted() {
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(0, 1)) {
// terminal completion to parent so it continues to the next
parent.completeInner();
}
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/rx/internal/operators/OperatorMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

import rx.Notification;
import rx.Observable.Operator;
Expand Down Expand Up @@ -76,10 +76,7 @@ private static class ParentSubscriber<T> extends Subscriber<T> {
// guarded by this
private boolean missed = false;

private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ParentSubscriber> REQUESTED = AtomicLongFieldUpdater
.newUpdater(ParentSubscriber.class, "requested");
private final AtomicLong requested = new AtomicLong();

ParentSubscriber(Subscriber<? super Notification<T>> child) {
this.child = child;
Expand All @@ -91,7 +88,7 @@ public void onStart() {
}

void requestMore(long n) {
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
BackpressureUtils.getAndAddRequest(requested, n);
request(n);
drain();
}
Expand All @@ -117,12 +114,13 @@ public void onNext(T t) {

private void decrementRequested() {
// atomically decrement requested
AtomicLong localRequested = this.requested;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.requested is final, no need to make local copy.

while (true) {
long r = requested;
long r = localRequested.get();
if (r == Long.MAX_VALUE) {
// don't decrement if unlimited requested
return;
} else if (REQUESTED.compareAndSet(this, r, r - 1)) {
} else if (localRequested.compareAndSet(r, r - 1)) {
return;
}
}
Expand All @@ -137,11 +135,12 @@ private void drain() {
}
}
// drain loop
final AtomicLong localRequested = this.requested;
while (!child.isUnsubscribed()) {
Notification<T> tn;
tn = terminalNotification;
if (tn != null) {
if (requested > 0) {
if (localRequested.get() > 0) {
// allow tn to be GC'd after the onNext call
terminalNotification = null;
// emit the terminal notification
Expand Down
Loading