Skip to content

Commit d165a21

Browse files
committed
change Atomic*FieldUpdater to Atomic* because of ReactiveX/RxJava#3488
1 parent d93c290 commit d165a21

File tree

2 files changed

+12
-18
lines changed

2 files changed

+12
-18
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2121
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
2222
<maven.compiler.target>1.6</maven.compiler.target>
23-
<rxjava.version>1.0.15</rxjava.version>
23+
<rxjava.version>1.0.14</rxjava.version>
2424
<jmh.version>1.11.1</jmh.version>
2525
<exec.version>1.4.0</exec.version>
2626
<slf4j.version>1.7.12</slf4j.version>

src/main/java/com/github/davidmoten/rx/subjects/PublishSubjectSingleSubscriber.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.github.davidmoten.rx.subjects;
22

3-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3+
import java.util.concurrent.atomic.AtomicReference;
44

55
import rx.Subscriber;
66
import rx.subjects.Subject;
@@ -42,44 +42,38 @@ public static <T> PublishSubjectSingleSubscriber<T> create() {
4242

4343
@Override
4444
public void onCompleted() {
45-
if (subscriberHolder.subscriber != null) {
46-
subscriberHolder.subscriber.onCompleted();
45+
if (subscriberHolder.subscriber.get() != null) {
46+
subscriberHolder.subscriber.get().onCompleted();
4747
}
4848
}
4949

5050
@Override
5151
public void onError(Throwable e) {
52-
if (subscriberHolder.subscriber != null)
53-
subscriberHolder.subscriber.onError(e);
52+
if (subscriberHolder.subscriber.get() != null)
53+
subscriberHolder.subscriber.get().onError(e);
5454
}
5555

5656
@Override
5757
public void onNext(T t) {
58-
if (subscriberHolder.subscriber != null)
59-
subscriberHolder.subscriber.onNext(t);
58+
if (subscriberHolder.subscriber.get() != null)
59+
subscriberHolder.subscriber.get().onNext(t);
6060
}
6161

6262
private static class SingleSubscribeOnSubscribe<T> implements OnSubscribe<T> {
6363

64-
volatile Subscriber<? super T> subscriber;
65-
66-
@SuppressWarnings("rawtypes")
67-
private final AtomicReferenceFieldUpdater<SingleSubscribeOnSubscribe, Subscriber> SUBSCRIBER = AtomicReferenceFieldUpdater
68-
.newUpdater(SingleSubscribeOnSubscribe.class, Subscriber.class, "subscriber");
64+
final AtomicReference<Subscriber<? super T>> subscriber = new AtomicReference<Subscriber<? super T>>();
6965

7066
@Override
71-
public void call(Subscriber<? super T> subscriber) {
72-
if (SUBSCRIBER.compareAndSet(this, null, subscriber))
73-
this.subscriber = subscriber;
74-
else
67+
public void call(Subscriber<? super T> sub) {
68+
if (!subscriber.compareAndSet(null, sub))
7569
throw new RuntimeException(ONLY_ONE_SUBSCRIPTION_IS_ALLOWED);
7670
}
7771

7872
}
7973

8074
@Override
8175
public boolean hasObservers() {
82-
return subscriberHolder.subscriber != null;
76+
return subscriberHolder.subscriber.get() != null;
8377
}
8478

8579
}

0 commit comments

Comments
 (0)