Skip to content

Commit b72beff

Browse files
authored
1.x: PublishSubject fail-fast when backpressured (#4225)
* 1.x: PublishSubject fail-fast when backpressured * Inline get(), use smaller error array at first.
1 parent d6f395e commit b72beff

File tree

5 files changed

+303
-62
lines changed

5 files changed

+303
-62
lines changed

src/main/java/rx/subjects/PublishSubject.java

Lines changed: 229 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
package rx.subjects;
1717

1818
import java.util.*;
19+
import java.util.concurrent.atomic.*;
1920

21+
import rx.*;
2022
import rx.Observer;
2123
import rx.annotations.Beta;
22-
import rx.exceptions.Exceptions;
23-
import rx.functions.Action1;
24-
import rx.internal.operators.NotificationLite;
25-
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
24+
import rx.exceptions.*;
25+
import rx.internal.operators.BackpressureUtils;
2626

2727
/**
2828
* Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
@@ -50,8 +50,8 @@
5050
* the type of items observed and emitted by the Subject
5151
*/
5252
public final class PublishSubject<T> extends Subject<T, T> {
53-
final SubjectSubscriptionManager<T> state;
54-
private final NotificationLite<T> nl = NotificationLite.instance();
53+
54+
final PublishSubjectState<T> state;
5555

5656
/**
5757
* Creates and returns a new {@code PublishSubject}.
@@ -60,63 +60,33 @@ public final class PublishSubject<T> extends Subject<T, T> {
6060
* @return the new {@code PublishSubject}
6161
*/
6262
public static <T> PublishSubject<T> create() {
63-
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
64-
state.onTerminated = new Action1<SubjectObserver<T>>() {
65-
66-
@Override
67-
public void call(SubjectObserver<T> o) {
68-
o.emitFirst(state.getLatest(), state.nl);
69-
}
70-
71-
};
72-
return new PublishSubject<T>(state, state);
63+
return new PublishSubject<T>(new PublishSubjectState<T>());
7364
}
74-
75-
protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
76-
super(onSubscribe);
65+
66+
protected PublishSubject(PublishSubjectState<T> state) {
67+
super(state);
7768
this.state = state;
7869
}
7970

8071
@Override
81-
public void onCompleted() {
82-
if (state.active) {
83-
Object n = nl.completed();
84-
for (SubjectObserver<T> bo : state.terminate(n)) {
85-
bo.emitNext(n, state.nl);
86-
}
87-
}
88-
72+
public void onNext(T v) {
73+
state.onNext(v);
8974
}
9075

9176
@Override
92-
public void onError(final Throwable e) {
93-
if (state.active) {
94-
Object n = nl.error(e);
95-
List<Throwable> errors = null;
96-
for (SubjectObserver<T> bo : state.terminate(n)) {
97-
try {
98-
bo.emitNext(n, state.nl);
99-
} catch (Throwable e2) {
100-
if (errors == null) {
101-
errors = new ArrayList<Throwable>();
102-
}
103-
errors.add(e2);
104-
}
105-
}
106-
Exceptions.throwIfAny(errors);
107-
}
77+
public void onError(Throwable e) {
78+
state.onError(e);
10879
}
10980

11081
@Override
111-
public void onNext(T v) {
112-
for (SubjectObserver<T> bo : state.observers()) {
113-
bo.onNext(v);
114-
}
82+
public void onCompleted() {
83+
state.onCompleted();
11584
}
11685

86+
11787
@Override
11888
public boolean hasObservers() {
119-
return state.observers().length > 0;
89+
return state.get().length != 0;
12090
}
12191

12292
/**
@@ -125,17 +95,15 @@ public boolean hasObservers() {
12595
*/
12696
@Beta
12797
public boolean hasThrowable() {
128-
Object o = state.getLatest();
129-
return nl.isError(o);
98+
return state.get() == PublishSubjectState.TERMINATED && state.error != null;
13099
}
131100
/**
132101
* Check if the Subject has terminated normally.
133102
* @return true if the subject completed normally via {@code onCompleted}
134103
*/
135104
@Beta
136105
public boolean hasCompleted() {
137-
Object o = state.getLatest();
138-
return o != null && !nl.isError(o);
106+
return state.get() == PublishSubjectState.TERMINATED && state.error == null;
139107
}
140108
/**
141109
* Returns the Throwable that terminated the Subject.
@@ -144,10 +112,216 @@ public boolean hasCompleted() {
144112
*/
145113
@Beta
146114
public Throwable getThrowable() {
147-
Object o = state.getLatest();
148-
if (nl.isError(o)) {
149-
return nl.getError(o);
115+
if (state.get() == PublishSubjectState.TERMINATED) {
116+
return state.error;
150117
}
151118
return null;
152119
}
120+
121+
static final class PublishSubjectState<T>
122+
extends AtomicReference<PublishSubjectProducer<T>[]>
123+
implements OnSubscribe<T>, Observer<T> {
124+
125+
/** */
126+
private static final long serialVersionUID = -7568940796666027140L;
127+
128+
@SuppressWarnings("rawtypes")
129+
static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0];
130+
@SuppressWarnings("rawtypes")
131+
static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0];
132+
133+
Throwable error;
134+
135+
@SuppressWarnings("unchecked")
136+
public PublishSubjectState() {
137+
lazySet(EMPTY);
138+
}
139+
140+
@Override
141+
public void call(Subscriber<? super T> t) {
142+
PublishSubjectProducer<T> pp = new PublishSubjectProducer<T>(this, t);
143+
t.add(pp);
144+
t.setProducer(pp);
145+
146+
if (add(pp)) {
147+
if (pp.isUnsubscribed()) {
148+
remove(pp);
149+
}
150+
} else {
151+
Throwable ex = error;
152+
if (ex != null) {
153+
t.onError(ex);
154+
} else {
155+
t.onCompleted();
156+
}
157+
}
158+
}
159+
160+
161+
boolean add(PublishSubjectProducer<T> inner) {
162+
for (;;) {
163+
PublishSubjectProducer<T>[] curr = get();
164+
if (curr == TERMINATED) {
165+
return false;
166+
}
167+
168+
int n = curr.length;
169+
170+
@SuppressWarnings("unchecked")
171+
PublishSubjectProducer<T>[] next = new PublishSubjectProducer[n + 1];
172+
System.arraycopy(curr, 0, next, 0, n);
173+
174+
next[n] = inner;
175+
if (compareAndSet(curr, next)) {
176+
return true;
177+
}
178+
}
179+
}
180+
181+
@SuppressWarnings("unchecked")
182+
void remove(PublishSubjectProducer<T> inner) {
183+
for (;;) {
184+
PublishSubjectProducer<T>[] curr = get();
185+
if (curr == TERMINATED || curr == EMPTY) {
186+
return;
187+
}
188+
189+
int n = curr.length;
190+
int j = -1;
191+
for (int i = 0; i < n; i++) {
192+
if (curr[i] == inner) {
193+
j = i;
194+
break;
195+
}
196+
}
197+
198+
if (j < 0) {
199+
return;
200+
}
201+
202+
PublishSubjectProducer<T>[] next;
203+
if (n == 1) {
204+
next = EMPTY;
205+
} else {
206+
next = new PublishSubjectProducer[n - 1];
207+
System.arraycopy(curr, 0, next, 0, j);
208+
System.arraycopy(curr, j + 1, next, j, n - j - 1);
209+
}
210+
211+
if (compareAndSet(curr, next)) {
212+
return;
213+
}
214+
}
215+
}
216+
217+
@Override
218+
public void onNext(T t) {
219+
for (PublishSubjectProducer<T> pp : get()) {
220+
pp.onNext(t);
221+
}
222+
}
223+
224+
@SuppressWarnings("unchecked")
225+
@Override
226+
public void onError(Throwable e) {
227+
error = e;
228+
List<Throwable> errors = null;
229+
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
230+
try {
231+
pp.onError(e);
232+
} catch (Throwable ex) {
233+
if (errors == null) {
234+
errors = new ArrayList<Throwable>(1);
235+
}
236+
errors.add(ex);
237+
}
238+
}
239+
240+
Exceptions.throwIfAny(errors);
241+
}
242+
243+
@SuppressWarnings("unchecked")
244+
@Override
245+
public void onCompleted() {
246+
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
247+
pp.onCompleted();
248+
}
249+
}
250+
251+
}
252+
253+
static final class PublishSubjectProducer<T>
254+
extends AtomicLong
255+
implements Producer, Subscription, Observer<T> {
256+
/** */
257+
private static final long serialVersionUID = 6451806817170721536L;
258+
259+
final PublishSubjectState<T> parent;
260+
261+
final Subscriber<? super T> actual;
262+
263+
long produced;
264+
265+
public PublishSubjectProducer(PublishSubjectState<T> parent, Subscriber<? super T> actual) {
266+
this.parent = parent;
267+
this.actual = actual;
268+
}
269+
270+
@Override
271+
public void request(long n) {
272+
if (BackpressureUtils.validate(n)) {
273+
for (;;) {
274+
long r = get();
275+
if (r == Long.MIN_VALUE) {
276+
return;
277+
}
278+
long u = BackpressureUtils.addCap(r, n);
279+
if (compareAndSet(r, u)) {
280+
return;
281+
}
282+
}
283+
}
284+
}
285+
286+
@Override
287+
public boolean isUnsubscribed() {
288+
return get() == Long.MIN_VALUE;
289+
}
290+
291+
@Override
292+
public void unsubscribe() {
293+
if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
294+
parent.remove(this);
295+
}
296+
}
297+
298+
@Override
299+
public void onNext(T t) {
300+
long r = get();
301+
if (r != Long.MIN_VALUE) {
302+
long p = produced;
303+
if (r != p) {
304+
produced = p + 1;
305+
actual.onNext(t);
306+
} else {
307+
unsubscribe();
308+
actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests"));
309+
}
310+
}
311+
}
312+
313+
@Override
314+
public void onError(Throwable e) {
315+
if (get() != Long.MIN_VALUE) {
316+
actual.onError(e);
317+
}
318+
}
319+
320+
@Override
321+
public void onCompleted() {
322+
if (get() != Long.MIN_VALUE) {
323+
actual.onCompleted();
324+
}
325+
}
326+
}
153327
}

src/test/java/rx/internal/operators/OperatorMapNotificationTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,16 @@ public Integer call() {
132132
ps.onNext(3);
133133
ps.onCompleted();
134134

135-
ts.assertValues(2, 3, 4, 5);
135+
ts.assertNoValues();
136+
ts.assertNoErrors();
137+
ts.assertNotCompleted();
138+
139+
ts.requestMore(1);
140+
141+
ts.assertValue(0);
136142
ts.assertNoErrors();
137143
ts.assertCompleted();
144+
138145
}
139146

140147
}

src/test/java/rx/internal/operators/OperatorPublishFunctionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public Observable<Integer> call(Observable<Integer> o) {
222222
ts.assertError(MissingBackpressureException.class);
223223
ts.assertNotCompleted();
224224

225-
Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage());
225+
Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
226226
Assert.assertFalse("Source has subscribers?", ps.hasObservers());
227227
}
228228

@@ -249,7 +249,7 @@ public Observable<Integer> call(Observable<Integer> o) {
249249
ts.assertError(MissingBackpressureException.class);
250250
ts.assertNotCompleted();
251251

252-
Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage());
252+
Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
253253
Assert.assertFalse("Source has subscribers?", ps.hasObservers());
254254
}
255255
}

src/test/java/rx/internal/operators/OperatorTakeTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,9 @@ public void testReentrantTake() {
425425

426426
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
427427

428-
source.take(1).doOnNext(new Action1<Integer>() {
428+
source
429+
.rebatchRequests(2) // take(1) requests 1
430+
.take(1).doOnNext(new Action1<Integer>() {
429431
@Override
430432
public void call(Integer v) {
431433
source.onNext(2);

0 commit comments

Comments
 (0)