Skip to content

Commit d173b6d

Browse files
authored
2.x: add subjects for Single, Maybe and Completable (#4967)
* 2.x: add subjects for Single, Maybe and Completable * Add CheckReturnValue, rename local test vars
1 parent f38eb99 commit d173b6d

File tree

6 files changed

+1526
-0
lines changed

6 files changed

+1526
-0
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.subjects;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.annotations.*;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.plugins.RxJavaPlugins;
22+
23+
/**
24+
* Represents a hot Completable-like source and consumer of events similar to Subjects.
25+
* <p>
26+
* All methods are thread safe. Calling onComplete multiple
27+
* times has no effect. Calling onError multiple times relays the Throwable to
28+
* the RxJavaPlugins' error handler.
29+
* <p>
30+
* The CompletableSubject doesn't store the Disposables coming through onSubscribe but
31+
* disposes them once the other onXXX methods were called (terminal state reached).
32+
* @since 2.0.5 - experimental
33+
*/
34+
@Experimental
35+
public final class CompletableSubject extends Completable implements CompletableObserver {
36+
37+
final AtomicReference<CompletableDisposable[]> observers;
38+
39+
static final CompletableDisposable[] EMPTY = new CompletableDisposable[0];
40+
41+
static final CompletableDisposable[] TERMINATED = new CompletableDisposable[0];
42+
43+
final AtomicBoolean once;
44+
Throwable error;
45+
46+
/**
47+
* Creates a fresh CompletableSubject.
48+
* @return the new CompletableSubject instance
49+
*/
50+
@CheckReturnValue
51+
public static CompletableSubject create() {
52+
return new CompletableSubject();
53+
}
54+
55+
CompletableSubject() {
56+
once = new AtomicBoolean();
57+
observers = new AtomicReference<CompletableDisposable[]>(EMPTY);
58+
}
59+
60+
@Override
61+
public void onSubscribe(Disposable d) {
62+
if (observers.get() == TERMINATED) {
63+
d.dispose();
64+
}
65+
}
66+
67+
@Override
68+
public void onError(Throwable e) {
69+
if (e == null) {
70+
e = new NullPointerException("Null errors are not allowed in 2.x");
71+
}
72+
if (once.compareAndSet(false, true)) {
73+
this.error = e;
74+
for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
75+
md.actual.onError(e);
76+
}
77+
} else {
78+
RxJavaPlugins.onError(e);
79+
}
80+
}
81+
82+
@Override
83+
public void onComplete() {
84+
if (once.compareAndSet(false, true)) {
85+
for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
86+
md.actual.onComplete();
87+
}
88+
}
89+
}
90+
91+
@Override
92+
protected void subscribeActual(CompletableObserver observer) {
93+
CompletableDisposable md = new CompletableDisposable(observer, this);
94+
observer.onSubscribe(md);
95+
if (add(md)) {
96+
if (md.isDisposed()) {
97+
remove(md);
98+
}
99+
} else {
100+
Throwable ex = error;
101+
if (ex != null) {
102+
observer.onError(ex);
103+
} else {
104+
observer.onComplete();
105+
}
106+
}
107+
}
108+
109+
boolean add(CompletableDisposable inner) {
110+
for (;;) {
111+
CompletableDisposable[] a = observers.get();
112+
if (a == TERMINATED) {
113+
return false;
114+
}
115+
116+
int n = a.length;
117+
118+
CompletableDisposable[] b = new CompletableDisposable[n + 1];
119+
System.arraycopy(a, 0, b, 0, n);
120+
b[n] = inner;
121+
if (observers.compareAndSet(a, b)) {
122+
return true;
123+
}
124+
}
125+
}
126+
127+
void remove(CompletableDisposable inner) {
128+
for (;;) {
129+
CompletableDisposable[] a = observers.get();
130+
int n = a.length;
131+
if (n == 0) {
132+
return;
133+
}
134+
135+
int j = -1;
136+
137+
for (int i = 0; i < n; i++) {
138+
if (a[i] == inner) {
139+
j = i;
140+
break;
141+
}
142+
}
143+
144+
if (j < 0) {
145+
return;
146+
}
147+
CompletableDisposable[] b;
148+
if (n == 1) {
149+
b = EMPTY;
150+
} else {
151+
b = new CompletableDisposable[n - 1];
152+
System.arraycopy(a, 0, b, 0, j);
153+
System.arraycopy(a, j + 1, b, j, n - j - 1);
154+
}
155+
156+
if (observers.compareAndSet(a, b)) {
157+
return;
158+
}
159+
}
160+
}
161+
162+
/**
163+
* Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise.
164+
* @return the terminal error or null if not terminated or not with an error
165+
*/
166+
public Throwable getThrowable() {
167+
if (observers.get() == TERMINATED) {
168+
return error;
169+
}
170+
return null;
171+
}
172+
173+
/**
174+
* Returns true if this CompletableSubject has been terminated with an error.
175+
* @return true if this CompletableSubject has been terminated with an error
176+
*/
177+
public boolean hasThrowable() {
178+
return observers.get() == TERMINATED && error != null;
179+
}
180+
181+
/**
182+
* Returns true if this CompletableSubject has been completed.
183+
* @return true if this CompletableSubject has been completed
184+
*/
185+
public boolean hasComplete() {
186+
return observers.get() == TERMINATED && error == null;
187+
}
188+
189+
/**
190+
* Returns true if this CompletableSubject has observers.
191+
* @return true if this CompletableSubject has observers
192+
*/
193+
public boolean hasObservers() {
194+
return observers.get().length != 0;
195+
}
196+
197+
/**
198+
* Returns the number of current observers.
199+
* @return the number of current observers
200+
*/
201+
/* test */ int observerCount() {
202+
return observers.get().length;
203+
}
204+
205+
static final class CompletableDisposable
206+
extends AtomicReference<CompletableSubject> implements Disposable {
207+
private static final long serialVersionUID = -7650903191002190468L;
208+
209+
final CompletableObserver actual;
210+
211+
CompletableDisposable(CompletableObserver actual, CompletableSubject parent) {
212+
this.actual = actual;
213+
lazySet(parent);
214+
}
215+
216+
@Override
217+
public void dispose() {
218+
CompletableSubject parent = getAndSet(null);
219+
if (parent != null) {
220+
parent.remove(this);
221+
}
222+
}
223+
224+
@Override
225+
public boolean isDisposed() {
226+
return get() == null;
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)