Skip to content

Commit f3495ac

Browse files
committed
toMap - prevent multiple terminal events and support backpressures (ReactiveX#4251)
1 parent e0b3662 commit f3495ac

File tree

3 files changed

+210
-78
lines changed

3 files changed

+210
-78
lines changed

src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11158,7 +11158,7 @@ public final Observable<List<T>> toList() {
1115811158
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1115911159
*/
1116011160
public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySelector) {
11161-
return lift(new OperatorToMap<T, K, T>(keySelector, UtilityFunctions.<T>identity()));
11161+
return create(new OnSubscribeToMap<T, K, T>(this, keySelector, UtilityFunctions.<T>identity()));
1116211162
}
1116311163

1116411164
/**
@@ -11188,7 +11188,7 @@ public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySe
1118811188
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1118911189
*/
1119011190
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
11191-
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector));
11191+
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector));
1119211192
}
1119311193

1119411194
/**
@@ -11217,7 +11217,7 @@ public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> ke
1121711217
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1121811218
*/
1121911219
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, V>> mapFactory) {
11220-
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector, mapFactory));
11220+
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector, mapFactory));
1122111221
}
1122211222

1122311223
/**

src/main/java/rx/internal/operators/OperatorToMap.java renamed to src/main/java/rx/internal/operators/OnSubscribeToMap.java

Lines changed: 64 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616

1717
package rx.internal.operators;
1818

19-
import java.util.*;
19+
import java.util.HashMap;
20+
import java.util.Map;
2021

21-
import rx.Observable.Operator;
22+
import rx.Observable;
23+
import rx.Observable.OnSubscribe;
2224
import rx.Subscriber;
2325
import rx.exceptions.Exceptions;
24-
import rx.functions.*;
25-
import rx.observers.Subscribers;
26+
import rx.functions.Func0;
27+
import rx.functions.Func1;
2628

2729
/**
2830
* Maps the elements of the source observable into a java.util.Map instance and
@@ -33,35 +35,25 @@
3335
* @param <K> the map-key type
3436
* @param <V> the map-value type
3537
*/
36-
public final class OperatorToMap<T, K, V> implements Operator<Map<K, V>, T> {
38+
public final class OnSubscribeToMap<T, K, V> implements OnSubscribe<Map<K, V>>, Func0<Map<K, V>> {
39+
40+
final Observable<T> source;
3741

3842
final Func1<? super T, ? extends K> keySelector;
3943

4044
final Func1<? super T, ? extends V> valueSelector;
4145

42-
private final Func0<? extends Map<K, V>> mapFactory;
43-
44-
/**
45-
* The default map factory.
46-
* @param <K> the key type
47-
* @param <V> the value type
48-
*/
49-
public static final class DefaultToMapFactory<K, V> implements Func0<Map<K, V>> {
50-
@Override
51-
public Map<K, V> call() {
52-
return new HashMap<K, V>();
53-
}
54-
}
46+
final Func0<? extends Map<K, V>> mapFactory;
5547

5648
/**
5749
* ToMap with key selector, value selector and default HashMap factory.
5850
* @param keySelector the function extracting the map-key from the main value
5951
* @param valueSelector the function extracting the map-value from the main value
6052
*/
61-
public OperatorToMap(
53+
public OnSubscribeToMap(Observable<T> source,
6254
Func1<? super T, ? extends K> keySelector,
6355
Func1<? super T, ? extends V> valueSelector) {
64-
this(keySelector, valueSelector, new DefaultToMapFactory<K, V>());
56+
this(source, keySelector, valueSelector, null);
6557
}
6658

6759

@@ -71,70 +63,72 @@ public OperatorToMap(
7163
* @param valueSelector the function extracting the map-value from the main value
7264
* @param mapFactory function that returns a Map instance to store keys and values into
7365
*/
74-
public OperatorToMap(
66+
public OnSubscribeToMap(Observable<T> source,
7567
Func1<? super T, ? extends K> keySelector,
7668
Func1<? super T, ? extends V> valueSelector,
7769
Func0<? extends Map<K, V>> mapFactory) {
70+
this.source = source;
7871
this.keySelector = keySelector;
7972
this.valueSelector = valueSelector;
80-
this.mapFactory = mapFactory;
81-
73+
if (mapFactory == null) {
74+
this.mapFactory = this;
75+
} else {
76+
this.mapFactory = mapFactory;
77+
}
8278
}
8379

8480
@Override
85-
public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber) {
86-
87-
Map<K, V> localMap;
88-
81+
public Map<K, V> call() {
82+
return new HashMap<K, V>();
83+
}
84+
85+
@Override
86+
public void call(final Subscriber<? super Map<K, V>> subscriber) {
87+
Map<K, V> map;
8988
try {
90-
localMap = mapFactory.call();
89+
map = mapFactory.call();
9190
} catch (Throwable ex) {
9291
Exceptions.throwOrReport(ex, subscriber);
93-
Subscriber<? super T> parent = Subscribers.empty();
94-
parent.unsubscribe();
95-
return parent;
92+
return;
93+
}
94+
new ToMapSubscriber<T, K, V>(subscriber, map, keySelector, valueSelector)
95+
.subscribeTo(source);;
96+
}
97+
98+
static final class ToMapSubscriber<T, K, V> extends DeferredScalarSubscriberSafe<T, Map<K,V>> {
99+
100+
final Func1<? super T, ? extends K> keySelector;
101+
final Func1<? super T, ? extends V> valueSelector;
102+
103+
ToMapSubscriber(Subscriber<? super Map<K,V>> actual, Map<K,V> map, Func1<? super T, ? extends K> keySelector,
104+
Func1<? super T, ? extends V> valueSelector) {
105+
super(actual);
106+
this.value = map;
107+
this.hasValue = true;
108+
this.keySelector = keySelector;
109+
this.valueSelector = valueSelector;
96110
}
97-
98-
final Map<K, V> fLocalMap = localMap;
99-
100-
return new Subscriber<T>(subscriber) {
101-
102-
private Map<K, V> map = fLocalMap;
103-
104-
@Override
105-
public void onStart() {
106-
request(Long.MAX_VALUE);
107-
}
108-
109-
@Override
110-
public void onNext(T v) {
111-
K key;
112-
V value;
113-
114-
try {
115-
key = keySelector.call(v);
116-
value = valueSelector.call(v);
117-
} catch (Throwable ex) {
118-
Exceptions.throwOrReport(ex, subscriber);
119-
return;
120-
}
121-
122-
map.put(key, value);
123-
}
124111

125-
@Override
126-
public void onError(Throwable e) {
127-
map = null;
128-
subscriber.onError(e);
112+
@Override
113+
public void onStart() {
114+
request(Long.MAX_VALUE);
115+
}
116+
117+
@Override
118+
public void onNext(T t) {
119+
if (done) {
120+
return;
129121
}
130-
131-
@Override
132-
public void onCompleted() {
133-
Map<K, V> map0 = map;
134-
map = null;
135-
subscriber.onNext(map0);
136-
subscriber.onCompleted();
122+
try {
123+
K key = keySelector.call(t);
124+
V val = valueSelector.call(t);
125+
value.put(key, val);
126+
} catch (Throwable ex) {
127+
Exceptions.throwIfFatal(ex);
128+
unsubscribe();
129+
onError(ex);
137130
}
138-
};
131+
}
139132
}
133+
140134
}

src/test/java/rx/internal/operators/OperatorToMapTest.java renamed to src/test/java/rx/internal/operators/OnSubscribeToMapTest.java

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,37 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.*;
21+
import static org.mockito.Mockito.verify;
2022

21-
import java.util.*;
23+
import java.util.Arrays;
24+
import java.util.HashMap;
25+
import java.util.LinkedHashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.CopyOnWriteArrayList;
2229

23-
import org.junit.*;
24-
import org.mockito.*;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.mockito.Mock;
33+
import org.mockito.MockitoAnnotations;
2534

2635
import rx.Observable;
36+
import rx.Observable.OnSubscribe;
2737
import rx.Observer;
38+
import rx.Producer;
39+
import rx.Subscriber;
2840
import rx.exceptions.TestException;
29-
import rx.functions.*;
41+
import rx.functions.Action1;
42+
import rx.functions.Func0;
43+
import rx.functions.Func1;
3044
import rx.internal.util.UtilityFunctions;
3145
import rx.observers.TestSubscriber;
46+
import rx.plugins.RxJavaHooks;
3247

33-
public class OperatorToMapTest {
48+
public class OnSubscribeToMapTest {
3449
@Mock
3550
Observer<Object> objectObserver;
3651

@@ -281,4 +296,127 @@ public Map<Integer, Integer> call() {
281296
ts.assertNoValues();
282297
ts.assertNotCompleted();
283298
}
299+
300+
@Test
301+
public void testFactoryFailureDoesNotAllowErrorAndCompletedEmissions() {
302+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
303+
final RuntimeException e = new RuntimeException();
304+
Observable.create(new OnSubscribe<Integer>() {
305+
306+
@Override
307+
public void call(final Subscriber<? super Integer> sub) {
308+
sub.setProducer(new Producer() {
309+
310+
@Override
311+
public void request(long n) {
312+
if (n > 1) {
313+
sub.onNext(1);
314+
sub.onCompleted();
315+
}
316+
}
317+
});
318+
}
319+
}).toMap(new Func1<Integer,Integer>() {
320+
321+
@Override
322+
public Integer call(Integer t) {
323+
throw e;
324+
}
325+
}).unsafeSubscribe(ts);
326+
ts.assertNoValues();
327+
ts.assertError(e);
328+
ts.assertNotCompleted();
329+
}
330+
331+
@Test
332+
public void testFactoryFailureDoesNotAllowTwoErrorEmissions() {
333+
try {
334+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
335+
RxJavaHooks.setOnError(new Action1<Throwable>() {
336+
337+
@Override
338+
public void call(Throwable t) {
339+
list.add(t);
340+
}
341+
});
342+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
343+
final RuntimeException e1 = new RuntimeException();
344+
final RuntimeException e2 = new RuntimeException();
345+
Observable.create(new OnSubscribe<Integer>() {
346+
347+
@Override
348+
public void call(final Subscriber<? super Integer> sub) {
349+
sub.setProducer(new Producer() {
350+
351+
@Override
352+
public void request(long n) {
353+
if (n > 1) {
354+
sub.onNext(1);
355+
sub.onError(e2);
356+
}
357+
}
358+
});
359+
}
360+
}).toMap(new Func1<Integer, Integer>() {
361+
362+
@Override
363+
public Integer call(Integer t) {
364+
throw e1;
365+
}
366+
}).unsafeSubscribe(ts);
367+
ts.assertNoValues();
368+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
369+
assertEquals(Arrays.asList(e2), list);
370+
ts.assertNotCompleted();
371+
} finally {
372+
RxJavaHooks.reset();
373+
}
374+
}
375+
376+
@Test
377+
public void testFactoryFailureDoesNotAllowErrorThenOnNextEmissions() {
378+
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
379+
final RuntimeException e = new RuntimeException();
380+
Observable.create(new OnSubscribe<Integer>() {
381+
382+
@Override
383+
public void call(final Subscriber<? super Integer> sub) {
384+
sub.setProducer(new Producer() {
385+
386+
@Override
387+
public void request(long n) {
388+
if (n > 1) {
389+
sub.onNext(1);
390+
sub.onNext(2);
391+
}
392+
}
393+
});
394+
}
395+
}).toMap(new Func1<Integer,Integer>() {
396+
397+
@Override
398+
public Integer call(Integer t) {
399+
throw e;
400+
}
401+
}).unsafeSubscribe(ts);
402+
ts.assertNoValues();
403+
ts.assertError(e);
404+
ts.assertNotCompleted();
405+
}
406+
407+
@Test
408+
public void testBackpressure() {
409+
TestSubscriber<Object> ts = TestSubscriber.create(0);
410+
Observable
411+
.just("a", "bb", "ccc", "dddd")
412+
.toMap(lengthFunc)
413+
.subscribe(ts);
414+
ts.assertNoErrors();
415+
ts.assertNotCompleted();
416+
ts.assertNoValues();
417+
ts.requestMore(1);
418+
ts.assertValueCount(1);
419+
ts.assertNoErrors();
420+
ts.assertCompleted();
421+
}
284422
}

0 commit comments

Comments
 (0)