Skip to content

Commit c70aa21

Browse files
akarnokdakarnokd
authored andcommitted
MapNotification producer NPE fix
1 parent 189928c commit c70aa21

File tree

2 files changed

+109
-42
lines changed

2 files changed

+109
-42
lines changed

src/main/java/rx/internal/operators/OperatorMapNotification.java

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919
import java.util.concurrent.ConcurrentLinkedQueue;
2020
import java.util.concurrent.atomic.AtomicLong;
2121

22+
import rx.*;
2223
import rx.Observable.Operator;
23-
import rx.Producer;
24-
import rx.Subscriber;
25-
import rx.Subscription;
26-
import rx.exceptions.MissingBackpressureException;
27-
import rx.exceptions.OnErrorThrowable;
28-
import rx.functions.Func0;
29-
import rx.functions.Func1;
30-
import rx.internal.util.unsafe.SpscArrayQueue;
31-
import rx.internal.util.unsafe.UnsafeAccess;
24+
import rx.exceptions.*;
25+
import rx.functions.*;
26+
import rx.internal.producers.ProducerArbiter;
27+
import rx.internal.util.unsafe.*;
3228

3329
/**
3430
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
@@ -50,44 +46,60 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup
5046

5147
@Override
5248
public Subscriber<? super T> call(final Subscriber<? super R> o) {
53-
Subscriber<T> subscriber = new Subscriber<T>() {
54-
SingleEmitter<R> emitter;
55-
@Override
56-
public void setProducer(Producer producer) {
57-
emitter = new SingleEmitter<R>(o, producer, this);
58-
o.setProducer(emitter);
59-
}
60-
61-
@Override
62-
public void onCompleted() {
63-
try {
64-
emitter.offerAndComplete(onCompleted.call());
65-
} catch (Throwable e) {
66-
o.onError(e);
67-
}
68-
}
49+
final ProducerArbiter pa = new ProducerArbiter();
50+
51+
MapNotificationSubscriber subscriber = new MapNotificationSubscriber(pa, o);
52+
o.add(subscriber);
53+
subscriber.init();
54+
return subscriber;
55+
}
56+
57+
final class MapNotificationSubscriber extends Subscriber<T> {
58+
private final Subscriber<? super R> o;
59+
private final ProducerArbiter pa;
60+
final SingleEmitter<R> emitter;
61+
62+
private MapNotificationSubscriber(ProducerArbiter pa, Subscriber<? super R> o) {
63+
this.pa = pa;
64+
this.o = o;
65+
this.emitter = new SingleEmitter<R>(o, pa, this);
66+
}
67+
68+
void init() {
69+
o.setProducer(emitter);
70+
}
6971

70-
@Override
71-
public void onError(Throwable e) {
72-
try {
73-
emitter.offerAndComplete(onError.call(e));
74-
} catch (Throwable e2) {
75-
o.onError(e);
76-
}
72+
@Override
73+
public void setProducer(Producer producer) {
74+
pa.setProducer(producer);
75+
}
76+
77+
@Override
78+
public void onCompleted() {
79+
try {
80+
emitter.offerAndComplete(onCompleted.call());
81+
} catch (Throwable e) {
82+
o.onError(e);
7783
}
84+
}
7885

79-
@Override
80-
public void onNext(T t) {
81-
try {
82-
emitter.offer(onNext.call(t));
83-
} catch (Throwable e) {
84-
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
85-
}
86+
@Override
87+
public void onError(Throwable e) {
88+
try {
89+
emitter.offerAndComplete(onError.call(e));
90+
} catch (Throwable e2) {
91+
o.onError(e);
8692
}
93+
}
8794

88-
};
89-
o.add(subscriber);
90-
return subscriber;
95+
@Override
96+
public void onNext(T t) {
97+
try {
98+
emitter.offer(onNext.call(t));
99+
} catch (Throwable e) {
100+
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
101+
}
102+
}
91103
}
92104
static final class SingleEmitter<T> extends AtomicLong implements Producer, Subscription {
93105
/** */
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import org.junit.Test;
20+
21+
import rx.Observable;
22+
import rx.functions.*;
23+
import rx.observers.TestSubscriber;
24+
25+
public class OperatorMapNotificationTest {
26+
@Test
27+
public void testJust() {
28+
TestSubscriber<Object> ts = TestSubscriber.create();
29+
Observable.just(1)
30+
.flatMap(
31+
new Func1<Integer, Observable<Object>>() {
32+
@Override
33+
public Observable<Object> call(Integer item) {
34+
return Observable.just((Object)(item + 1));
35+
}
36+
},
37+
new Func1<Throwable, Observable<Object>>() {
38+
@Override
39+
public Observable<Object> call(Throwable e) {
40+
return Observable.error(e);
41+
}
42+
},
43+
new Func0<Observable<Object>>() {
44+
@Override
45+
public Observable<Object> call() {
46+
return Observable.never();
47+
}
48+
}
49+
).subscribe(ts);
50+
51+
ts.assertNoErrors();
52+
ts.assertNotCompleted();
53+
ts.assertValue(2);
54+
}
55+
}

0 commit comments

Comments
 (0)