Skip to content

Commit 4c0f93b

Browse files
authored
2.x: coverage, fixes, cleanup 8/27-1 (#4431)
1 parent 0094304 commit 4c0f93b

File tree

11 files changed

+959
-229
lines changed

11 files changed

+959
-229
lines changed

src/main/java/io/reactivex/internal/operators/single/SingleCache.java

Lines changed: 128 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,97 +13,166 @@
1313

1414
package io.reactivex.internal.operators.single;
1515

16-
import java.util.*;
1716
import java.util.concurrent.atomic.*;
1817

1918
import io.reactivex.*;
2019
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.internal.disposables.EmptyDisposable;
22-
import io.reactivex.internal.util.NotificationLite;
2320

24-
public final class SingleCache<T> extends Single<T> {
21+
public final class SingleCache<T> extends Single<T> implements SingleObserver<T> {
2522

23+
@SuppressWarnings("rawtypes")
24+
static final CacheDisposable[] EMPTY = new CacheDisposable[0];
25+
@SuppressWarnings("rawtypes")
26+
static final CacheDisposable[] TERMINATED = new CacheDisposable[0];
27+
2628
final SingleSource<? extends T> source;
2729

2830
final AtomicInteger wip;
29-
final AtomicReference<Object> notification;
30-
final List<SingleObserver<? super T>> subscribers;
3131

32+
final AtomicReference<CacheDisposable<T>[]> observers;
33+
34+
T value;
35+
36+
Throwable error;
37+
38+
@SuppressWarnings("unchecked")
3239
public SingleCache(SingleSource<? extends T> source) {
3340
this.source = source;
3441
this.wip = new AtomicInteger();
35-
this.notification = new AtomicReference<Object>();
36-
this.subscribers = new ArrayList<SingleObserver<? super T>>();
42+
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY);
3743
}
3844

3945
@Override
40-
protected void subscribeActual(SingleObserver<? super T> s) {
41-
42-
Object o = notification.get();
43-
if (o != null) {
44-
s.onSubscribe(EmptyDisposable.INSTANCE);
45-
if (NotificationLite.isError(o)) {
46-
s.onError(NotificationLite.getError(o));
46+
protected void subscribeActual(final SingleObserver<? super T> s) {
47+
CacheDisposable<T> d = new CacheDisposable<T>(s, this);
48+
s.onSubscribe(d);
49+
50+
if (add(d)) {
51+
if (d.isDisposed()) {
52+
remove(d);
53+
}
54+
} else {
55+
Throwable ex = error;
56+
if (ex != null) {
57+
s.onError(ex);
4758
} else {
48-
s.onSuccess(NotificationLite.<T>getValue(o));
59+
s.onSuccess(value);
4960
}
5061
return;
5162
}
5263

53-
synchronized (subscribers) {
54-
o = notification.get();
55-
if (o == null) {
56-
subscribers.add(s);
64+
if (wip.getAndIncrement() == 0) {
65+
source.subscribe(this);
66+
}
67+
}
68+
69+
boolean add(CacheDisposable<T> observer) {
70+
for (;;) {
71+
CacheDisposable<T>[] a = observers.get();
72+
if (a == TERMINATED) {
73+
return false;
74+
}
75+
int n = a.length;
76+
@SuppressWarnings("unchecked")
77+
CacheDisposable<T>[] b = new CacheDisposable[n + 1];
78+
System.arraycopy(a, 0, b, 0, n);
79+
b[n] = observer;
80+
if (observers.compareAndSet(a, b)) {
81+
return true;
5782
}
5883
}
59-
if (o != null) {
60-
s.onSubscribe(EmptyDisposable.INSTANCE);
61-
if (NotificationLite.isError(o)) {
62-
s.onError(NotificationLite.getError(o));
84+
}
85+
86+
@SuppressWarnings("unchecked")
87+
void remove(CacheDisposable<T> observer) {
88+
for (;;) {
89+
CacheDisposable<T>[] a = observers.get();
90+
int n = a.length;
91+
if (n == 0) {
92+
return;
93+
}
94+
95+
int j = -1;
96+
for (int i = 0; i < n; i++) {
97+
if (a[i] == observer) {
98+
j = i;
99+
break;
100+
}
101+
}
102+
103+
if (j < 0) {
104+
return;
105+
}
106+
107+
CacheDisposable<T>[] b;
108+
109+
if (n == 1) {
110+
b = EMPTY;
63111
} else {
64-
s.onSuccess(NotificationLite.<T>getValue(o));
112+
b = new CacheDisposable[n - 1];
113+
System.arraycopy(a, 0, b, 0, j);
114+
System.arraycopy(a, j + 1, b, j, n - j - 1);
115+
}
116+
if (observers.compareAndSet(a, b)) {
117+
return;
65118
}
66-
return;
67119
}
120+
}
121+
122+
@Override
123+
public void onSubscribe(Disposable d) {
124+
// not supported by this operator
125+
}
126+
127+
@SuppressWarnings("unchecked")
128+
@Override
129+
public void onSuccess(T value) {
130+
this.value = value;
68131

69-
if (wip.getAndIncrement() != 0) {
70-
return;
132+
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) {
133+
if (!d.isDisposed()) {
134+
d.actual.onSuccess(value);
135+
}
71136
}
137+
}
138+
139+
@SuppressWarnings("unchecked")
140+
@Override
141+
public void onError(Throwable e) {
142+
this.error = e;
72143

73-
source.subscribe(new SingleObserver<T>() {
74-
75-
@Override
76-
public void onSubscribe(Disposable d) {
77-
144+
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) {
145+
if (!d.isDisposed()) {
146+
d.actual.onError(e);
78147
}
148+
}
149+
}
150+
151+
static final class CacheDisposable<T>
152+
extends AtomicBoolean
153+
implements Disposable {
154+
/** */
155+
private static final long serialVersionUID = 7514387411091976596L;
79156

80-
@Override
81-
public void onSuccess(T value) {
82-
notification.set(NotificationLite.next(value));
83-
List<SingleObserver<? super T>> list;
84-
synchronized (subscribers) {
85-
list = new ArrayList<SingleObserver<? super T>>(subscribers);
86-
subscribers.clear();
87-
}
88-
for (SingleObserver<? super T> s1 : list) {
89-
s1.onSuccess(value);
90-
}
91-
}
157+
final SingleObserver<? super T> actual;
158+
159+
final SingleCache<T> parent;
92160

93-
@Override
94-
public void onError(Throwable e) {
95-
notification.set(NotificationLite.error(e));
96-
List<SingleObserver<? super T>> list;
97-
synchronized (subscribers) {
98-
list = new ArrayList<SingleObserver<? super T>>(subscribers);
99-
subscribers.clear();
100-
}
101-
for (SingleObserver<? super T> s1 : list) {
102-
s1.onError(e);
103-
}
161+
public CacheDisposable(SingleObserver<? super T> actual, SingleCache<T> parent) {
162+
this.actual = actual;
163+
this.parent = parent;
164+
}
165+
166+
@Override
167+
public boolean isDisposed() {
168+
return get();
169+
}
170+
171+
@Override
172+
public void dispose() {
173+
if (compareAndSet(false, true)) {
174+
parent.remove(this);
104175
}
105-
106-
});
176+
}
107177
}
108-
109178
}

src/main/java/io/reactivex/internal/subscribers/observable/EmptyObserver.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

0 commit comments

Comments
 (0)