Skip to content

Commit 2e32a33

Browse files
committed
Merge pull request #3333 from akarnokd/SubscribersTests2x
2.x: subscribers/observers tests
2 parents 8b7a8a6 + 6cb1ab6 commit 2e32a33

19 files changed

+3706
-1
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
import java.util.*;
17+
18+
public final class CompositeException extends RuntimeException {
19+
/** */
20+
private static final long serialVersionUID = 2004635183691362481L;
21+
22+
public CompositeException() {
23+
super();
24+
}
25+
26+
public CompositeException(String message) {
27+
super(message);
28+
}
29+
30+
public List<Throwable> getExceptions() {
31+
Throwable cause = getCause();
32+
Throwable[] suppressed = getSuppressed();
33+
List<Throwable> list = new ArrayList<>(cause != null
34+
? 1 + suppressed.length : suppressed.length);
35+
if (cause != null) {
36+
list.add(cause);
37+
}
38+
for (Throwable t : suppressed) {
39+
list.add(t);
40+
}
41+
42+
return list;
43+
}
44+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
public final class OnCompleteFailedException extends RuntimeException {
17+
/** */
18+
private static final long serialVersionUID = -6179993283427447098L;
19+
20+
public OnCompleteFailedException(Throwable cause) {
21+
super(cause);
22+
}
23+
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
public final class OnErrorFailedException extends RuntimeException {
17+
/** */
18+
private static final long serialVersionUID = 2656125445290831911L;
19+
20+
public OnErrorFailedException(Throwable cause) {
21+
super(cause);
22+
}
23+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
public final class OnErrorNotImplementedException extends RuntimeException {
17+
/** */
18+
private static final long serialVersionUID = -3698670655303683299L;
19+
20+
public OnErrorNotImplementedException() {
21+
super();
22+
}
23+
24+
public OnErrorNotImplementedException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
27+
28+
public OnErrorNotImplementedException(String message) {
29+
super(message);
30+
}
31+
32+
public OnErrorNotImplementedException(Throwable cause) {
33+
super(cause);
34+
}
35+
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
public final class UnsubscribeFailedException extends RuntimeException {
17+
/** */
18+
private static final long serialVersionUID = 8947024194181365640L;
19+
20+
public UnsubscribeFailedException(Throwable cause) {
21+
super(cause);
22+
}
23+
24+
}

src/main/java/io/reactivex/internal/operators/OperatorDoOnEach.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ static final class DoOnEachSubscriber<T> implements Subscriber<T> {
5151

5252
Subscription s;
5353

54+
boolean done;
55+
5456
public DoOnEachSubscriber(
5557
Subscriber<? super T> actual,
5658
Consumer<? super T> onNext,
@@ -75,6 +77,9 @@ public void onSubscribe(Subscription s) {
7577

7678
@Override
7779
public void onNext(T t) {
80+
if (done) {
81+
return;
82+
}
7883
try {
7984
onNext.accept(t);
8085
} catch (Throwable e) {
@@ -88,6 +93,11 @@ public void onNext(T t) {
8893

8994
@Override
9095
public void onError(Throwable t) {
96+
if (done) {
97+
RxJavaPlugins.onError(t);
98+
return;
99+
}
100+
done = true;
91101
try {
92102
onError.accept(t);
93103
} catch (Throwable e) {
@@ -104,6 +114,10 @@ public void onError(Throwable t) {
104114

105115
@Override
106116
public void onComplete() {
117+
if (done) {
118+
return;
119+
}
120+
done = true;
107121
try {
108122
onComplete.run();
109123
} catch (Throwable e) {

src/main/java/io/reactivex/internal/subscribers/CancelledSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.reactivex.plugins.RxJavaPlugins;
1919

2020
/**
21-
* A subscriber cancels the subscription sent to it
21+
* A subscriber that cancels the subscription sent to it
2222
* and ignores all events (onError is forwarded to RxJavaPlugins though).
2323
*/
2424
public enum CancelledSubscriber implements Subscriber<Object> {

0 commit comments

Comments
 (0)