Skip to content

Commit cb1712d

Browse files
committed
Merge pull request #3112 from akarnokd/TestCoverageObservers
Observers package test coverage and fixes.
2 parents 98530ed + 2423a17 commit cb1712d

File tree

10 files changed

+1475
-169
lines changed

10 files changed

+1475
-169
lines changed

src/main/java/rx/observers/SerializedObserver.java

Lines changed: 80 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
package rx.observers;
1717

1818
import rx.Observer;
19-
import rx.exceptions.Exceptions;
19+
import rx.exceptions.*;
20+
import rx.internal.operators.NotificationLite;
2021

2122
/**
2223
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
@@ -35,13 +36,15 @@
3536
public class SerializedObserver<T> implements Observer<T> {
3637
private final Observer<? super T> actual;
3738

38-
private boolean emitting = false;
39-
private boolean terminated = false;
39+
private boolean emitting;
40+
/** Set to true if a terminal event was received. */
41+
private volatile boolean terminated;
42+
/** If not null, it indicates more work. */
4043
private FastList queue;
44+
private final NotificationLite<T> nl = NotificationLite.instance();
4145

42-
private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE;
43-
private static final Object NULL_SENTINEL = new Object();
44-
private static final Object COMPLETE_SENTINEL = new Object();
46+
/** Number of iterations without additional safepoint poll in the drain loop. */
47+
private static final int MAX_DRAIN_ITERATION = 1024;
4548

4649
static final class FastList {
4750
Object[] array;
@@ -64,150 +67,119 @@ public void add(Object o) {
6467
}
6568
}
6669

67-
private static final class ErrorSentinel {
68-
final Throwable e;
69-
70-
ErrorSentinel(Throwable e) {
71-
this.e = e;
72-
}
73-
}
74-
7570
public SerializedObserver(Observer<? super T> s) {
7671
this.actual = s;
7772
}
7873

7974
@Override
80-
public void onCompleted() {
81-
FastList list;
75+
public void onNext(T t) {
76+
if (terminated) {
77+
return;
78+
}
8279
synchronized (this) {
8380
if (terminated) {
8481
return;
8582
}
86-
terminated = true;
8783
if (emitting) {
88-
if (queue == null) {
89-
queue = new FastList();
84+
FastList list = queue;
85+
if (list == null) {
86+
list = new FastList();
87+
queue = list;
9088
}
91-
queue.add(COMPLETE_SENTINEL);
89+
list.add(nl.next(t));
9290
return;
9391
}
9492
emitting = true;
95-
list = queue;
96-
queue = null;
9793
}
98-
drainQueue(list);
99-
actual.onCompleted();
94+
try {
95+
actual.onNext(t);
96+
} catch (Throwable e) {
97+
terminated = true;
98+
Exceptions.throwIfFatal(e);
99+
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
100+
return;
101+
}
102+
for (;;) {
103+
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
104+
FastList list;
105+
synchronized (this) {
106+
list = queue;
107+
if (list == null) {
108+
emitting = false;
109+
return;
110+
}
111+
queue = null;
112+
}
113+
for (Object o : list.array) {
114+
if (o == null) {
115+
break;
116+
}
117+
try {
118+
if (nl.accept(actual, o)) {
119+
terminated = true;
120+
return;
121+
}
122+
} catch (Throwable e) {
123+
terminated = true;
124+
Exceptions.throwIfFatal(e);
125+
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
126+
return;
127+
}
128+
}
129+
}
130+
}
100131
}
101-
132+
102133
@Override
103134
public void onError(final Throwable e) {
104135
Exceptions.throwIfFatal(e);
105-
FastList list;
136+
if (terminated) {
137+
return;
138+
}
106139
synchronized (this) {
107140
if (terminated) {
108141
return;
109142
}
143+
terminated = true;
110144
if (emitting) {
111-
if (queue == null) {
112-
queue = new FastList();
145+
/*
146+
* FIXME: generally, errors jump the queue but this wasn't true
147+
* for SerializedObserver and may break existing expectations.
148+
*/
149+
FastList list = queue;
150+
if (list == null) {
151+
list = new FastList();
152+
queue = list;
113153
}
114-
queue.add(new ErrorSentinel(e));
154+
list.add(nl.error(e));
115155
return;
116156
}
117157
emitting = true;
118-
list = queue;
119-
queue = null;
120158
}
121-
drainQueue(list);
122159
actual.onError(e);
123-
synchronized(this) {
124-
emitting = false;
125-
}
126160
}
127161

128162
@Override
129-
public void onNext(T t) {
130-
FastList list;
131-
163+
public void onCompleted() {
164+
if (terminated) {
165+
return;
166+
}
132167
synchronized (this) {
133168
if (terminated) {
134169
return;
135170
}
171+
terminated = true;
136172
if (emitting) {
137-
if (queue == null) {
138-
queue = new FastList();
173+
FastList list = queue;
174+
if (list == null) {
175+
list = new FastList();
176+
queue = list;
139177
}
140-
queue.add(t != null ? t : NULL_SENTINEL);
141-
// another thread is emitting so we add to the queue and return
178+
list.add(nl.completed());
142179
return;
143180
}
144-
// we can emit
145181
emitting = true;
146-
// reference to the list to drain before emitting our value
147-
list = queue;
148-
queue = null;
149-
}
150-
151-
// we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
152-
boolean skipFinal = false;
153-
try {
154-
int iter = MAX_DRAIN_ITERATION;
155-
do {
156-
drainQueue(list);
157-
if (iter == MAX_DRAIN_ITERATION) {
158-
// after the first draining we emit our own value
159-
actual.onNext(t);
160-
}
161-
--iter;
162-
if (iter > 0) {
163-
synchronized (this) {
164-
list = queue;
165-
queue = null;
166-
if (list == null) {
167-
emitting = false;
168-
skipFinal = true;
169-
return;
170-
}
171-
}
172-
}
173-
} while (iter > 0);
174-
} finally {
175-
if (!skipFinal) {
176-
synchronized (this) {
177-
if (terminated) {
178-
list = queue;
179-
queue = null;
180-
} else {
181-
emitting = false;
182-
list = null;
183-
}
184-
}
185-
}
186-
}
187-
188-
// this will only drain if terminated (done here outside of synchronized block)
189-
drainQueue(list);
190-
}
191-
192-
void drainQueue(FastList list) {
193-
if (list == null || list.size == 0) {
194-
return;
195-
}
196-
for (Object v : list.array) {
197-
if (v == null) {
198-
break;
199-
}
200-
if (v == NULL_SENTINEL) {
201-
actual.onNext(null);
202-
} else if (v == COMPLETE_SENTINEL) {
203-
actual.onCompleted();
204-
} else if (v.getClass() == ErrorSentinel.class) {
205-
actual.onError(((ErrorSentinel) v).e);
206-
} else {
207-
@SuppressWarnings("unchecked")
208-
T t = (T)v;
209-
actual.onNext(t);
210-
}
211182
}
183+
actual.onCompleted();
212184
}
213185
}

src/main/java/rx/observers/TestObserver.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,17 @@ public void assertReceivedOnNext(List<T> items) {
117117
}
118118

119119
for (int i = 0; i < items.size(); i++) {
120-
if (items.get(i) == null) {
120+
T expected = items.get(i);
121+
T actual = onNextEvents.get(i);
122+
if (expected == null) {
121123
// check for null equality
122-
if (onNextEvents.get(i) != null) {
123-
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]");
124+
if (actual != null) {
125+
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + actual + "]");
124126
}
125-
} else if (!items.get(i).equals(onNextEvents.get(i))) {
126-
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")");
127+
} else if (!expected.equals(actual)) {
128+
throw new AssertionError("Value at index: " + i
129+
+ " expected to be [" + expected + "] (" + expected.getClass().getSimpleName()
130+
+ ") but was: [" + actual + "] (" + (actual != null ? actual.getClass().getSimpleName() : "null") + ")");
127131

128132
}
129133
}

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,15 @@ public void assertUnsubscribed() {
258258
* if this {@code Subscriber} has received one or more {@code onError} notifications
259259
*/
260260
public void assertNoErrors() {
261-
if (getOnErrorEvents().size() > 0) {
262-
// can't use AssertionError because (message, cause) doesn't exist until Java 7
263-
throw new RuntimeException("Unexpected onError events: " + getOnErrorEvents().size(), getOnErrorEvents().get(0));
264-
// TODO possibly check for Java7+ and then use AssertionError at runtime (since we always compile with 7)
261+
List<Throwable> onErrorEvents = getOnErrorEvents();
262+
if (onErrorEvents.size() > 0) {
263+
AssertionError ae = new AssertionError("Unexpected onError events: " + getOnErrorEvents().size());
264+
if (onErrorEvents.size() == 1) {
265+
ae.initCause(getOnErrorEvents().get(0));
266+
} else {
267+
ae.initCause(new CompositeException(onErrorEvents));
268+
}
269+
throw ae;
265270
}
266271
}
267272

0 commit comments

Comments
 (0)