Skip to content

Commit c31dd7f

Browse files
authored
1.x: coverage improvements, small adjustments (#4060)
1 parent 161dbef commit c31dd7f

20 files changed

+648
-136
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ apply plugin: 'jacoco'
1010
apply plugin: 'nebula.rxjava-project'
1111

1212
dependencies {
13-
testCompile 'junit:junit-dep:4.10'
14-
testCompile 'org.mockito:mockito-core:1.8.5'
13+
testCompile 'junit:junit:4.12'
14+
testCompile 'org.mockito:mockito-core:1.10.19'
1515

1616
perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
1717
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.11.3'

src/main/java/rx/Notification.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,24 @@ public boolean isOnNext() {
158158
* @param observer the target observer to call onXXX methods on based on the kind of this Notification instance
159159
*/
160160
public void accept(Observer<? super T> observer) {
161-
if (isOnNext()) {
161+
switch (kind) {
162+
case OnNext:
162163
observer.onNext(getValue());
163-
} else if (isOnCompleted()) {
164-
observer.onCompleted();
165-
} else if (isOnError()) {
164+
break;
165+
case OnError:
166166
observer.onError(getThrowable());
167+
break;
168+
case OnCompleted:
169+
observer.onCompleted();
170+
break;
171+
default:
172+
throw new AssertionError("Uncovered case: " + kind);
167173
}
168174
}
169175

176+
/**
177+
* Specifies the kind of the notification: an element, an error or a completion notification.
178+
*/
170179
public enum Kind {
171180
OnNext, OnError, OnCompleted
172181
}
@@ -211,19 +220,11 @@ public boolean equals(Object obj) {
211220
return false;
212221
}
213222

214-
if (hasValue() && !getValue().equals(notification.getValue())) {
215-
return false;
216-
}
217-
218-
if (hasThrowable() && !getThrowable().equals(notification.getThrowable())) {
219-
return false;
220-
}
221-
222-
if (!hasValue() && !hasThrowable() && notification.hasValue()) {
223+
if (!(value == notification.value || (value != null && value.equals(notification.value)))) {
223224
return false;
224225
}
225226

226-
if (!hasValue() && !hasThrowable() && notification.hasThrowable()) {
227+
if (!(throwable == notification.throwable || (throwable != null && throwable.equals(notification.throwable)))) {
227228
return false;
228229
}
229230

src/main/java/rx/Observable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3280,7 +3280,8 @@ public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<
32803280
* <dt><b>Scheduler:</b></dt>
32813281
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
32823282
* </dl>
3283-
*
3283+
*
3284+
* @param <R> the result type
32843285
* @param ws
32853286
* an array of source Observables
32863287
* @param zipFunction

src/main/java/rx/Scheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,9 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
130130
long startInNanos = firstStartInNanos;
131131
@Override
132132
public void call() {
133+
action.call();
134+
133135
if (!mas.isUnsubscribed()) {
134-
action.call();
135136

136137
long nextTick;
137138

src/main/java/rx/observers/SerializedObserver.java

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ public class SerializedObserver<T> implements Observer<T> {
4343
private FastList queue;
4444
private final NotificationLite<T> nl = NotificationLite.instance();
4545

46-
/** Number of iterations without additional safepoint poll in the drain loop. */
47-
private static final int MAX_DRAIN_ITERATION = 1024;
48-
4946
static final class FastList {
5047
Object[] array;
5148
int size;
@@ -99,31 +96,29 @@ public void onNext(T t) {
9996
return;
10097
}
10198
for (;;) {
102-
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
103-
FastList list;
104-
synchronized (this) {
105-
list = queue;
106-
if (list == null) {
107-
emitting = false;
108-
return;
109-
}
110-
queue = null;
99+
FastList list;
100+
synchronized (this) {
101+
list = queue;
102+
if (list == null) {
103+
emitting = false;
104+
return;
111105
}
112-
for (Object o : list.array) {
113-
if (o == null) {
114-
break;
115-
}
116-
try {
117-
if (nl.accept(actual, o)) {
118-
terminated = true;
119-
return;
120-
}
121-
} catch (Throwable e) {
106+
queue = null;
107+
}
108+
for (Object o : list.array) {
109+
if (o == null) {
110+
break;
111+
}
112+
try {
113+
if (nl.accept(actual, o)) {
122114
terminated = true;
123-
Exceptions.throwIfFatal(e);
124-
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
125115
return;
126116
}
117+
} catch (Throwable e) {
118+
terminated = true;
119+
Exceptions.throwIfFatal(e);
120+
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
121+
return;
127122
}
128123
}
129124
}

0 commit comments

Comments
 (0)