Skip to content

Commit c705109

Browse files
committed
toMap - prevent multiple terminal events
1 parent 0577b4c commit c705109

File tree

3 files changed

+218
-75
lines changed

3 files changed

+218
-75
lines changed

src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11158,7 +11158,7 @@ public final Observable<List<T>> toList() {
1115811158
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1115911159
*/
1116011160
public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySelector) {
11161-
return lift(new OperatorToMap<T, K, T>(keySelector, UtilityFunctions.<T>identity()));
11161+
return create(new OnSubscribeToMap<T, K, T>(this, keySelector, UtilityFunctions.<T>identity()));
1116211162
}
1116311163

1116411164
/**
@@ -11188,7 +11188,7 @@ public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySe
1118811188
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1118911189
*/
1119011190
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
11191-
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector));
11191+
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector));
1119211192
}
1119311193

1119411194
/**
@@ -11217,7 +11217,7 @@ public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> ke
1121711217
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1121811218
*/
1121911219
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, V>> mapFactory) {
11220-
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector, mapFactory));
11220+
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector, mapFactory));
1122111221
}
1122211222

1122311223
/**

src/main/java/rx/internal/operators/OperatorToMap.java renamed to src/main/java/rx/internal/operators/OnSubscribeToMap.java

Lines changed: 72 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616

1717
package rx.internal.operators;
1818

19-
import java.util.*;
19+
import java.util.HashMap;
20+
import java.util.Map;
2021

21-
import rx.Observable.Operator;
22+
import rx.Observable;
23+
import rx.Observable.OnSubscribe;
2224
import rx.Subscriber;
2325
import rx.exceptions.Exceptions;
24-
import rx.functions.*;
25-
import rx.observers.Subscribers;
26+
import rx.functions.Func0;
27+
import rx.functions.Func1;
2628

2729
/**
2830
* Maps the elements of the source observable into a java.util.Map instance and
@@ -33,35 +35,26 @@
3335
* @param <K> the map-key type
3436
* @param <V> the map-value type
3537
*/
36-
public final class OperatorToMap<T, K, V> implements Operator<Map<K, V>, T> {
38+
public final class OnSubscribeToMap<T, K, V> implements OnSubscribe<Map<K, V>> {
39+
40+
final Observable<T> source;
3741

3842
final Func1<? super T, ? extends K> keySelector;
3943

4044
final Func1<? super T, ? extends V> valueSelector;
4145

42-
private final Func0<? extends Map<K, V>> mapFactory;
46+
final Func0<? extends Map<K, V>> mapFactory;
4347

44-
/**
45-
* The default map factory.
46-
* @param <K> the key type
47-
* @param <V> the value type
48-
*/
49-
public static final class DefaultToMapFactory<K, V> implements Func0<Map<K, V>> {
50-
@Override
51-
public Map<K, V> call() {
52-
return new HashMap<K, V>();
53-
}
54-
}
5548

5649
/**
5750
* ToMap with key selector, value selector and default HashMap factory.
5851
* @param keySelector the function extracting the map-key from the main value
5952
* @param valueSelector the function extracting the map-value from the main value
6053
*/
61-
public OperatorToMap(
54+
public OnSubscribeToMap(Observable<T> source,
6255
Func1<? super T, ? extends K> keySelector,
6356
Func1<? super T, ? extends V> valueSelector) {
64-
this(keySelector, valueSelector, new DefaultToMapFactory<K, V>());
57+
this(source, keySelector, valueSelector, DefaultMapFactory.<K, V>instance());
6558
}
6659

6760

@@ -71,70 +64,82 @@ public OperatorToMap(
7164
* @param valueSelector the function extracting the map-value from the main value
7265
* @param mapFactory function that returns a Map instance to store keys and values into
7366
*/
74-
public OperatorToMap(
67+
public OnSubscribeToMap(Observable<T> source,
7568
Func1<? super T, ? extends K> keySelector,
7669
Func1<? super T, ? extends V> valueSelector,
7770
Func0<? extends Map<K, V>> mapFactory) {
71+
this.source = source;
7872
this.keySelector = keySelector;
7973
this.valueSelector = valueSelector;
8074
this.mapFactory = mapFactory;
81-
8275
}
8376

8477
@Override
85-
public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber) {
86-
87-
Map<K, V> localMap;
88-
78+
public void call(final Subscriber<? super Map<K, V>> subscriber) {
79+
Map<K, V> map;
8980
try {
90-
localMap = mapFactory.call();
81+
map = mapFactory.call();
9182
} catch (Throwable ex) {
9283
Exceptions.throwOrReport(ex, subscriber);
93-
Subscriber<? super T> parent = Subscribers.empty();
94-
parent.unsubscribe();
95-
return parent;
84+
return;
85+
}
86+
new ToMapSubscriber<T, K, V>(subscriber, map, keySelector, valueSelector)
87+
.subscribeTo(source);;
88+
}
89+
90+
static final class ToMapSubscriber<T, K, V> extends DeferredScalarSubscriberSafe<T, Map<K,V>> {
91+
92+
final Func1<? super T, ? extends K> keySelector;
93+
final Func1<? super T, ? extends V> valueSelector;
94+
95+
ToMapSubscriber(Subscriber<? super Map<K,V>> actual, Map<K,V> map, Func1<? super T, ? extends K> keySelector,
96+
Func1<? super T, ? extends V> valueSelector) {
97+
super(actual);
98+
this.value = map;
99+
this.hasValue = true;
100+
this.keySelector = keySelector;
101+
this.valueSelector = valueSelector;
96102
}
97-
98-
final Map<K, V> fLocalMap = localMap;
99-
100-
return new Subscriber<T>(subscriber) {
101-
102-
private Map<K, V> map = fLocalMap;
103103

104-
@Override
105-
public void onStart() {
106-
request(Long.MAX_VALUE);
107-
}
108-
109-
@Override
110-
public void onNext(T v) {
111-
K key;
112-
V value;
113-
114-
try {
115-
key = keySelector.call(v);
116-
value = valueSelector.call(v);
117-
} catch (Throwable ex) {
118-
Exceptions.throwOrReport(ex, subscriber);
119-
return;
120-
}
121-
122-
map.put(key, value);
104+
@Override
105+
public void onStart() {
106+
request(Long.MAX_VALUE);
107+
}
108+
109+
@Override
110+
public void onNext(T t) {
111+
if (done) {
112+
return;
123113
}
124-
125-
@Override
126-
public void onError(Throwable e) {
127-
map = null;
128-
subscriber.onError(e);
114+
try {
115+
K key = keySelector.call(t);
116+
V val = valueSelector.call(t);
117+
value.put(key, val);
118+
} catch (Throwable ex) {
119+
Exceptions.throwIfFatal(ex);
120+
unsubscribe();
121+
onError(ex);
129122
}
123+
}
124+
}
125+
126+
/**
127+
* The default map factory.
128+
* @param <K> the key type
129+
* @param <V> the value type
130+
*/
131+
static final class DefaultMapFactory<K, V> implements Func0<Map<K, V>> {
130132

131-
@Override
132-
public void onCompleted() {
133-
Map<K, V> map0 = map;
134-
map = null;
135-
subscriber.onNext(map0);
136-
subscriber.onCompleted();
137-
}
138-
};
133+
private static final DefaultMapFactory<Object,Object> instance = new DefaultMapFactory<Object,Object>();
134+
135+
@SuppressWarnings("unchecked")
136+
static <K, V> DefaultMapFactory<K,V> instance() {
137+
return (DefaultMapFactory<K, V>) instance;
138+
}
139+
140+
@Override
141+
public Map<K, V> call() {
142+
return new HashMap<K, V>();
143+
}
139144
}
140145
}

src/test/java/rx/internal/operators/OperatorToMapTest.java renamed to src/test/java/rx/internal/operators/OnSubscribeToMapTest.java

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,37 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.*;
21+
import static org.mockito.Mockito.verify;
2022

21-
import java.util.*;
23+
import java.util.Arrays;
24+
import java.util.HashMap;
25+
import java.util.LinkedHashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
2229

23-
import org.junit.*;
24-
import org.mockito.*;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.mockito.Mock;
33+
import org.mockito.MockitoAnnotations;
2534

2635
import rx.Observable;
36+
import rx.Observable.OnSubscribe;
2737
import rx.Observer;
38+
import rx.Producer;
39+
import rx.Subscriber;
2840
import rx.exceptions.TestException;
29-
import rx.functions.*;
41+
import rx.functions.Action1;
42+
import rx.functions.Func0;
43+
import rx.functions.Func1;
3044
import rx.internal.util.UtilityFunctions;
3145
import rx.observers.TestSubscriber;
46+
import rx.plugins.RxJavaHooks;
3247

33-
public class OperatorToMapTest {
48+
public class OnSubscribeToMapTest {
3449
@Mock
3550
Observer<Object> objectObserver;
3651

@@ -281,4 +296,127 @@ public Map<Integer, Integer> call() {
281296
ts.assertNoValues();
282297
ts.assertNotCompleted();
283298
}
299+
300+
@Test
301+
public void testFactoryFailureDoesNotAllowErrorAndCompletedEmissions() {
302+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
303+
final RuntimeException e = new RuntimeException();
304+
Observable.create(new OnSubscribe<Integer>() {
305+
306+
@Override
307+
public void call(final Subscriber<? super Integer> sub) {
308+
sub.setProducer(new Producer() {
309+
310+
@Override
311+
public void request(long n) {
312+
if (n > 1) {
313+
sub.onNext(1);
314+
sub.onCompleted();
315+
}
316+
}
317+
});
318+
}
319+
}).toMap(new Func1<Integer,Integer>() {
320+
321+
@Override
322+
public Integer call(Integer t) {
323+
throw e;
324+
}
325+
}).unsafeSubscribe(ts);
326+
ts.assertNoValues();
327+
ts.assertError(e);
328+
ts.assertNotCompleted();
329+
}
330+
331+
@Test
332+
public void testFactoryFailureDoesNotAllowTwoErrorEmissions() {
333+
try {
334+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
335+
RxJavaHooks.setOnError(new Action1<Throwable>() {
336+
337+
@Override
338+
public void call(Throwable t) {
339+
list.add(t);
340+
}
341+
});
342+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
343+
final RuntimeException e1 = new RuntimeException();
344+
final RuntimeException e2 = new RuntimeException();
345+
Observable.create(new OnSubscribe<Integer>() {
346+
347+
@Override
348+
public void call(final Subscriber<? super Integer> sub) {
349+
sub.setProducer(new Producer() {
350+
351+
@Override
352+
public void request(long n) {
353+
if (n > 1) {
354+
sub.onNext(1);
355+
sub.onError(e2);
356+
}
357+
}
358+
});
359+
}
360+
}).toMap(new Func1<Integer, Integer>() {
361+
362+
@Override
363+
public Integer call(Integer t) {
364+
throw e1;
365+
}
366+
}).unsafeSubscribe(ts);
367+
ts.assertNoValues();
368+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
369+
assertEquals(Arrays.asList(e2), list);
370+
ts.assertNotCompleted();
371+
} finally {
372+
RxJavaHooks.reset();
373+
}
374+
}
375+
376+
@Test
377+
public void testFactoryFailureDoesNotAllowErrorThenOnNextEmissions() {
378+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
379+
final RuntimeException e = new RuntimeException();
380+
Observable.create(new OnSubscribe<Integer>() {
381+
382+
@Override
383+
public void call(final Subscriber<? super Integer> sub) {
384+
sub.setProducer(new Producer() {
385+
386+
@Override
387+
public void request(long n) {
388+
if (n > 1) {
389+
sub.onNext(1);
390+
sub.onNext(2);
391+
}
392+
}
393+
});
394+
}
395+
}).toMap(new Func1<Integer,Integer>() {
396+
397+
@Override
398+
public Integer call(Integer t) {
399+
throw e;
400+
}
401+
}).unsafeSubscribe(ts);
402+
ts.assertNoValues();
403+
ts.assertError(e);
404+
ts.assertNotCompleted();
405+
}
406+
407+
@Test
408+
public void testBackpressure() {
409+
TestSubscriber<Object> ts = TestSubscriber.create(0);
410+
Observable
411+
.just("a", "bb", "ccc", "dddd")
412+
.toMap(lengthFunc)
413+
.subscribe(ts);
414+
ts.assertNoErrors();
415+
ts.assertNotCompleted();
416+
ts.assertNoValues();
417+
ts.requestMore(1);
418+
ts.assertValueCount(1);
419+
ts.assertNoErrors();
420+
ts.assertCompleted();
421+
}
284422
}

0 commit comments

Comments
 (0)