Skip to content

toMap - prevent multiple terminal events, support backpressure #4251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11158,7 +11158,7 @@ public final Observable<List<T>> toList() {
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorToMap<T, K, T>(keySelector, UtilityFunctions.<T>identity()));
return create(new OnSubscribeToMap<T, K, T>(this, keySelector, UtilityFunctions.<T>identity()));
}

/**
Expand Down Expand Up @@ -11188,7 +11188,7 @@ public final <K> Observable<Map<K, T>> toMap(Func1<? super T, ? extends K> keySe
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector));
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector));
}

/**
Expand Down Expand Up @@ -11217,7 +11217,7 @@ public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> ke
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
*/
public final <K, V> Observable<Map<K, V>> toMap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, V>> mapFactory) {
return lift(new OperatorToMap<T, K, V>(keySelector, valueSelector, mapFactory));
return create(new OnSubscribeToMap<T, K, V>(this, keySelector, valueSelector, mapFactory));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package rx.internal.operators;

import java.util.*;
import java.util.HashMap;
import java.util.Map;

import rx.Observable.Operator;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.*;
import rx.observers.Subscribers;
import rx.functions.Func0;
import rx.functions.Func1;

/**
* Maps the elements of the source observable into a java.util.Map instance and
Expand All @@ -33,35 +35,25 @@
* @param <K> the map-key type
* @param <V> the map-value type
*/
public final class OperatorToMap<T, K, V> implements Operator<Map<K, V>, T> {
public final class OnSubscribeToMap<T, K, V> implements OnSubscribe<Map<K, V>>, Func0<Map<K, V>> {

final Observable<T> source;

final Func1<? super T, ? extends K> keySelector;

final Func1<? super T, ? extends V> valueSelector;

private final Func0<? extends Map<K, V>> mapFactory;

/**
* The default map factory.
* @param <K> the key type
* @param <V> the value type
*/
public static final class DefaultToMapFactory<K, V> implements Func0<Map<K, V>> {
@Override
public Map<K, V> call() {
return new HashMap<K, V>();
}
}
final Func0<? extends Map<K, V>> mapFactory;

/**
* ToMap with key selector, value selector and default HashMap factory.
* @param keySelector the function extracting the map-key from the main value
* @param valueSelector the function extracting the map-value from the main value
*/
public OperatorToMap(
public OnSubscribeToMap(Observable<T> source,
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector) {
this(keySelector, valueSelector, new DefaultToMapFactory<K, V>());
this(source, keySelector, valueSelector, null);
}


Expand All @@ -71,70 +63,72 @@ public OperatorToMap(
* @param valueSelector the function extracting the map-value from the main value
* @param mapFactory function that returns a Map instance to store keys and values into
*/
public OperatorToMap(
public OnSubscribeToMap(Observable<T> source,
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector,
Func0<? extends Map<K, V>> mapFactory) {
this.source = source;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.mapFactory = mapFactory;

if (mapFactory == null) {
this.mapFactory = this;
} else {
this.mapFactory = mapFactory;
}
}

@Override
public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber) {

Map<K, V> localMap;

public Map<K, V> call() {
return new HashMap<K, V>();
}

@Override
public void call(final Subscriber<? super Map<K, V>> subscriber) {
Map<K, V> map;
try {
localMap = mapFactory.call();
map = mapFactory.call();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, subscriber);
Subscriber<? super T> parent = Subscribers.empty();
parent.unsubscribe();
return parent;
return;
}
new ToMapSubscriber<T, K, V>(subscriber, map, keySelector, valueSelector)
.subscribeTo(source);;
}

static final class ToMapSubscriber<T, K, V> extends DeferredScalarSubscriberSafe<T, Map<K,V>> {

final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends V> valueSelector;

ToMapSubscriber(Subscriber<? super Map<K,V>> actual, Map<K,V> map, Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector) {
super(actual);
this.value = map;
this.hasValue = true;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
}

final Map<K, V> fLocalMap = localMap;

return new Subscriber<T>(subscriber) {

private Map<K, V> map = fLocalMap;

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T v) {
K key;
V value;

try {
key = keySelector.call(v);
value = valueSelector.call(v);
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, subscriber);
return;
}

map.put(key, value);
}

@Override
public void onError(Throwable e) {
map = null;
subscriber.onError(e);
@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
if (done) {
return;
}

@Override
public void onCompleted() {
Map<K, V> map0 = map;
map = null;
subscriber.onNext(map0);
subscriber.onCompleted();
try {
K key = keySelector.call(t);
V val = valueSelector.call(t);
value.put(key, val);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(ex);
}
};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,37 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verify;

import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.junit.*;
import org.mockito.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.functions.*;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.internal.util.UtilityFunctions;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;

public class OperatorToMapTest {
public class OnSubscribeToMapTest {
@Mock
Observer<Object> objectObserver;

Expand Down Expand Up @@ -281,4 +296,127 @@ public Map<Integer, Integer> call() {
ts.assertNoValues();
ts.assertNotCompleted();
}

@Test
public void testFactoryFailureDoesNotAllowErrorAndCompletedEmissions() {
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
final RuntimeException e = new RuntimeException();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> sub) {
sub.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 1) {
sub.onNext(1);
sub.onCompleted();
}
}
});
}
}).toMap(new Func1<Integer,Integer>() {

@Override
public Integer call(Integer t) {
throw e;
}
}).unsafeSubscribe(ts);
ts.assertNoValues();
ts.assertError(e);
ts.assertNotCompleted();
}

@Test
public void testFactoryFailureDoesNotAllowTwoErrorEmissions() {
try {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
RxJavaHooks.setOnError(new Action1<Throwable>() {

@Override
public void call(Throwable t) {
list.add(t);
}
});
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
final RuntimeException e1 = new RuntimeException();
final RuntimeException e2 = new RuntimeException();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> sub) {
sub.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 1) {
sub.onNext(1);
sub.onError(e2);
}
}
});
}
}).toMap(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t) {
throw e1;
}
}).unsafeSubscribe(ts);
ts.assertNoValues();
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
assertEquals(Arrays.asList(e2), list);
ts.assertNotCompleted();
} finally {
RxJavaHooks.reset();
}
}

@Test
public void testFactoryFailureDoesNotAllowErrorThenOnNextEmissions() {
TestSubscriber<Map<Integer, Integer>> ts = TestSubscriber.create(0);
final RuntimeException e = new RuntimeException();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> sub) {
sub.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 1) {
sub.onNext(1);
sub.onNext(2);
}
}
});
}
}).toMap(new Func1<Integer,Integer>() {

@Override
public Integer call(Integer t) {
throw e;
}
}).unsafeSubscribe(ts);
ts.assertNoValues();
ts.assertError(e);
ts.assertNotCompleted();
}

@Test
public void testBackpressure() {
TestSubscriber<Object> ts = TestSubscriber.create(0);
Observable
.just("a", "bb", "ccc", "dddd")
.toMap(lengthFunc)
.subscribe(ts);
ts.assertNoErrors();
ts.assertNotCompleted();
ts.assertNoValues();
ts.requestMore(1);
ts.assertValueCount(1);
ts.assertNoErrors();
ts.assertCompleted();
}
}