Skip to content

Commit 515d7ce

Browse files
davidmotenakarnokd
authored andcommitted
toMultimap - prevent post-terminal-emissions (#4270)
1 parent c95c650 commit 515d7ce

File tree

4 files changed

+362
-197
lines changed

4 files changed

+362
-197
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11239,7 +11239,7 @@ public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> ke
1123911239
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1124011240
*/
1124111241
public final <K> Observable<Map<K, Collection<T>>> toMultimap(Func1<? super T, ? extends K> keySelector) {
11242-
return lift(new OperatorToMultimap<T, K, T>(keySelector, UtilityFunctions.<T>identity()));
11242+
return create(new OnSubscribeToMultimap<T, K, T>(this, keySelector, UtilityFunctions.<T>identity()));
1124311243
}
1124411244

1124511245
/**
@@ -11267,7 +11267,7 @@ public final <K> Observable<Map<K, Collection<T>>> toMultimap(Func1<? super T, ?
1126711267
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1126811268
*/
1126911269
public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
11270-
return lift(new OperatorToMultimap<T, K, V>(keySelector, valueSelector));
11270+
return create(new OnSubscribeToMultimap<T, K, V>(this, keySelector, valueSelector));
1127111271
}
1127211272

1127311273
/**
@@ -11297,7 +11297,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
1129711297
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1129811298
*/
1129911299
public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, Collection<V>>> mapFactory) {
11300-
return lift(new OperatorToMultimap<T, K, V>(keySelector, valueSelector, mapFactory));
11300+
return create(new OnSubscribeToMultimap<T, K, V>(this, keySelector, valueSelector, mapFactory));
1130111301
}
1130211302

1130311303
/**
@@ -11329,7 +11329,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
1132911329
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1133011330
*/
1133111331
public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, Collection<V>>> mapFactory, Func1<? super K, ? extends Collection<V>> collectionFactory) {
11332-
return lift(new OperatorToMultimap<T, K, V>(keySelector, valueSelector, mapFactory, collectionFactory));
11332+
return create(new OnSubscribeToMultimap<T, K, V>(this, keySelector, valueSelector, mapFactory, collectionFactory));
1133311333
}
1133411334

1133511335
/**
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/**
2+
* Copyright one 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import rx.Observable;
25+
import rx.Observable.OnSubscribe;
26+
import rx.Subscriber;
27+
import rx.exceptions.Exceptions;
28+
import rx.functions.Func0;
29+
import rx.functions.Func1;
30+
31+
/**
32+
* Maps the elements of the source observable into a multimap
33+
* (Map&lt;K, Collection&lt;V>>) where each
34+
* key entry has a collection of the source's values.
35+
*
36+
* @see <a href="https://github.com/ReactiveX/RxJava/issues/97">Issue #97</a>
37+
* @param <T> the value type of the input
38+
* @param <K> the multimap-key type
39+
* @param <V> the multimap-value type
40+
*/
41+
public final class OnSubscribeToMultimap<T, K, V> implements OnSubscribe<Map<K, Collection<V>>>, Func0<Map<K, Collection<V>>> {
42+
43+
private final Func1<? super T, ? extends K> keySelector;
44+
private final Func1<? super T, ? extends V> valueSelector;
45+
private final Func0<? extends Map<K, Collection<V>>> mapFactory;
46+
private final Func1<? super K, ? extends Collection<V>> collectionFactory;
47+
private final Observable<T> source;
48+
49+
/**
50+
* ToMultimap with key selector, custom value selector,
51+
* default HashMap factory and default ArrayList collection factory.
52+
* @param keySelector the function extracting the map-key from the main value
53+
* @param valueSelector the function extracting the map-value from the main value
54+
*/
55+
public OnSubscribeToMultimap(
56+
Observable<T> source,
57+
Func1<? super T, ? extends K> keySelector,
58+
Func1<? super T, ? extends V> valueSelector) {
59+
this(source, keySelector, valueSelector,
60+
null,
61+
DefaultMultimapCollectionFactory.<K, V>instance());
62+
}
63+
64+
/**
65+
* ToMultimap with key selector, custom value selector,
66+
* custom Map factory and default ArrayList collection factory.
67+
* @param the observable source
68+
* @param keySelector the function extracting the map-key from the main value
69+
* @param valueSelector the function extracting the map-value from the main value
70+
* @param mapFactory function that returns a Map instance to store keys and values into
71+
*/
72+
public OnSubscribeToMultimap(
73+
Observable<T> source,
74+
Func1<? super T, ? extends K> keySelector,
75+
Func1<? super T, ? extends V> valueSelector,
76+
Func0<? extends Map<K, Collection<V>>> mapFactory) {
77+
this(source, keySelector, valueSelector,
78+
mapFactory,
79+
DefaultMultimapCollectionFactory.<K, V>instance());
80+
}
81+
82+
/**
83+
* ToMultimap with key selector, custom value selector,
84+
* custom Map factory and custom collection factory.
85+
* @param source the observable source
86+
* @param keySelector the function extracting the map-key from the main value
87+
* @param valueSelector the function extracting the map-value from the main value
88+
* @param mapFactory function that returns a Map instance to store keys and values into
89+
* @param collectionFactory function that returns a Collection for a particular key to store values into
90+
*/
91+
public OnSubscribeToMultimap(
92+
Observable<T> source,
93+
Func1<? super T, ? extends K> keySelector,
94+
Func1<? super T, ? extends V> valueSelector,
95+
Func0<? extends Map<K, Collection<V>>> mapFactory,
96+
Func1<? super K, ? extends Collection<V>> collectionFactory) {
97+
this.source = source;
98+
this.keySelector = keySelector;
99+
this.valueSelector = valueSelector;
100+
if (mapFactory == null) {
101+
this.mapFactory = this;
102+
} else {
103+
this.mapFactory = mapFactory;
104+
}
105+
this.collectionFactory = collectionFactory;
106+
}
107+
108+
// default map factory
109+
@Override
110+
public Map<K, Collection<V>> call() {
111+
return new HashMap<K, Collection<V>>();
112+
}
113+
114+
@Override
115+
public void call(final Subscriber<? super Map<K, Collection<V>>> subscriber) {
116+
117+
Map<K, Collection<V>> map;
118+
try {
119+
map = mapFactory.call();
120+
} catch (Throwable ex) {
121+
Exceptions.throwIfFatal(ex);
122+
subscriber.onError(ex);
123+
return;
124+
}
125+
new ToMultimapSubscriber<T, K, V>(
126+
subscriber, map, keySelector, valueSelector, collectionFactory)
127+
.subscribeTo(source);
128+
}
129+
130+
private static final class ToMultimapSubscriber<T, K, V>
131+
extends DeferredScalarSubscriberSafe<T,Map<K, Collection<V>>> {
132+
133+
private final Func1<? super T, ? extends K> keySelector;
134+
private final Func1<? super T, ? extends V> valueSelector;
135+
private final Func1<? super K, ? extends Collection<V>> collectionFactory;
136+
137+
ToMultimapSubscriber(
138+
Subscriber<? super Map<K, Collection<V>>> subscriber,
139+
Map<K, Collection<V>> map,
140+
Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector,
141+
Func1<? super K, ? extends Collection<V>> collectionFactory) {
142+
super(subscriber);
143+
this.value = map;
144+
this.hasValue = true;
145+
this.keySelector = keySelector;
146+
this.valueSelector = valueSelector;
147+
this.collectionFactory = collectionFactory;
148+
}
149+
150+
@Override
151+
public void onStart() {
152+
request(Long.MAX_VALUE);
153+
}
154+
155+
@Override
156+
public void onNext(T t) {
157+
if (done){
158+
return;
159+
}
160+
try {
161+
// any interaction with keySelector, valueSelector, collectionFactory, collection or value
162+
// may fail unexpectedly because their behaviour is customisable by the user. For this
163+
// reason we wrap their calls in try-catch block.
164+
165+
K key = keySelector.call(t);
166+
V v = valueSelector.call(t);
167+
Collection<V> collection = value.get(key);
168+
if (collection == null) {
169+
collection = collectionFactory.call(key);
170+
value.put(key, collection);
171+
}
172+
collection.add(v);
173+
} catch (Throwable ex) {
174+
Exceptions.throwIfFatal(ex);
175+
unsubscribe();
176+
onError(ex);
177+
}
178+
179+
}
180+
}
181+
182+
/**
183+
* The default collection factory for a key in the multimap returning
184+
* an ArrayList independent of the key.
185+
* @param <K> the key type
186+
* @param <V> the value type
187+
*/
188+
private static final class DefaultMultimapCollectionFactory<K, V>
189+
implements Func1<K, Collection<V>> {
190+
191+
private static final DefaultMultimapCollectionFactory<Object,Object> INSTANCE = new DefaultMultimapCollectionFactory<Object, Object>();
192+
193+
@SuppressWarnings("unchecked")
194+
static <K, V> DefaultMultimapCollectionFactory<K,V> instance() {
195+
return (DefaultMultimapCollectionFactory<K, V>) INSTANCE;
196+
}
197+
198+
@Override
199+
public Collection<V> call(K t1) {
200+
return new ArrayList<V>();
201+
}
202+
}
203+
204+
}

0 commit comments

Comments
 (0)