Skip to content

Commit 7a2b6ee

Browse files
committed
toMap - prevent multiple terminal events
1 parent 0577b4c commit 7a2b6ee

File tree

2 files changed

+144
-7
lines changed

2 files changed

+144
-7
lines changed

src/main/java/rx/internal/operators/OperatorToMap.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.exceptions.Exceptions;
2424
import rx.functions.*;
2525
import rx.observers.Subscribers;
26+
import rx.plugins.RxJavaHooks;
2627

2728
/**
2829
* Maps the elements of the source observable into a java.util.Map instance and
@@ -100,6 +101,8 @@ public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber
100101
return new Subscriber<T>(subscriber) {
101102

102103
private Map<K, V> map = fLocalMap;
104+
105+
boolean done;
103106

104107
@Override
105108
public void onStart() {
@@ -108,14 +111,17 @@ public void onStart() {
108111

109112
@Override
110113
public void onNext(T v) {
114+
if (done) {
115+
return;
116+
}
111117
K key;
112118
V value;
113119

114120
try {
115121
key = keySelector.call(v);
116122
value = valueSelector.call(v);
117123
} catch (Throwable ex) {
118-
Exceptions.throwOrReport(ex, subscriber);
124+
Exceptions.throwOrReport(ex, this);
119125
return;
120126
}
121127

@@ -124,12 +130,21 @@ public void onNext(T v) {
124130

125131
@Override
126132
public void onError(Throwable e) {
127-
map = null;
128-
subscriber.onError(e);
133+
if (!done) {
134+
done = true;
135+
map = null;
136+
subscriber.onError(e);
137+
} else {
138+
RxJavaHooks.onError(e);
139+
}
129140
}
130141

131142
@Override
132143
public void onCompleted() {
144+
if (done) {
145+
return;
146+
}
147+
done = true;
133148
Map<K, V> map0 = map;
134149
map = null;
135150
subscriber.onNext(map0);

src/test/java/rx/internal/operators/OperatorToMapTest.java

Lines changed: 126 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,35 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.*;
21+
import static org.mockito.Mockito.verify;
2022

21-
import java.util.*;
23+
import java.util.Arrays;
24+
import java.util.HashMap;
25+
import java.util.LinkedHashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
2229

23-
import org.junit.*;
24-
import org.mockito.*;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.mockito.Mock;
33+
import org.mockito.MockitoAnnotations;
2534

2635
import rx.Observable;
36+
import rx.Observable.OnSubscribe;
2737
import rx.Observer;
38+
import rx.Producer;
39+
import rx.Subscriber;
2840
import rx.exceptions.TestException;
29-
import rx.functions.*;
41+
import rx.functions.Action1;
42+
import rx.functions.Func0;
43+
import rx.functions.Func1;
3044
import rx.internal.util.UtilityFunctions;
3145
import rx.observers.TestSubscriber;
46+
import rx.plugins.RxJavaHooks;
3247

3348
public class OperatorToMapTest {
3449
@Mock
@@ -281,4 +296,111 @@ public Map<Integer, Integer> call() {
281296
ts.assertNoValues();
282297
ts.assertNotCompleted();
283298
}
299+
300+
@Test
301+
public void testFactoryFailureDoesNotAllowErrorAndCompletedEmissions() {
302+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
303+
final RuntimeException e = new RuntimeException();
304+
Observable.create(new OnSubscribe<Integer>() {
305+
306+
@Override
307+
public void call(final Subscriber<? super Integer> sub) {
308+
sub.setProducer(new Producer() {
309+
310+
@Override
311+
public void request(long n) {
312+
if (n > 1) {
313+
sub.onNext(1);
314+
sub.onCompleted();
315+
}
316+
}
317+
});
318+
}
319+
}).toMap(new Func1<Integer,Integer>() {
320+
321+
@Override
322+
public Integer call(Integer t) {
323+
throw e;
324+
}
325+
}).unsafeSubscribe(ts);
326+
ts.assertNoValues();
327+
ts.assertError(e);
328+
ts.assertNotCompleted();
329+
}
330+
331+
@Test
332+
public void testFactoryFailureDoesNotAllowTwoErrorEmissions() {
333+
try {
334+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
335+
RxJavaHooks.setOnError(new Action1<Throwable>() {
336+
337+
@Override
338+
public void call(Throwable t) {
339+
list.add(t);
340+
}
341+
});
342+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
343+
final RuntimeException e1 = new RuntimeException();
344+
final RuntimeException e2 = new RuntimeException();
345+
Observable.create(new OnSubscribe<Integer>() {
346+
347+
@Override
348+
public void call(final Subscriber<? super Integer> sub) {
349+
sub.setProducer(new Producer() {
350+
351+
@Override
352+
public void request(long n) {
353+
if (n > 1) {
354+
sub.onNext(1);
355+
sub.onError(e2);
356+
}
357+
}
358+
});
359+
}
360+
}).toMap(new Func1<Integer, Integer>() {
361+
362+
@Override
363+
public Integer call(Integer t) {
364+
throw e1;
365+
}
366+
}).unsafeSubscribe(ts);
367+
ts.assertNoValues();
368+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
369+
assertEquals(Arrays.asList(e2), list);
370+
ts.assertNotCompleted();
371+
} finally {
372+
RxJavaHooks.setOnError(null);
373+
}
374+
}
375+
376+
@Test
377+
public void testFactoryFailureDoesNotAllowErrorThenOnNextEmissions() {
378+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
379+
final RuntimeException e = new RuntimeException();
380+
Observable.create(new OnSubscribe<Integer>() {
381+
382+
@Override
383+
public void call(final Subscriber<? super Integer> sub) {
384+
sub.setProducer(new Producer() {
385+
386+
@Override
387+
public void request(long n) {
388+
if (n > 1) {
389+
sub.onNext(1);
390+
sub.onNext(2);
391+
}
392+
}
393+
});
394+
}
395+
}).toMap(new Func1<Integer,Integer>() {
396+
397+
@Override
398+
public Integer call(Integer t) {
399+
throw e;
400+
}
401+
}).unsafeSubscribe(ts);
402+
ts.assertNoValues();
403+
ts.assertError(e);
404+
ts.assertNotCompleted();
405+
}
284406
}

0 commit comments

Comments
 (0)