Skip to content

2.x: collect - handle post terminal events - Observable #4428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableCollect<T, U> extends AbstractObservableWithUpstream<T, U> {
final Callable<? extends U> initialSupplier;
Expand Down Expand Up @@ -56,6 +57,8 @@ static final class CollectSubscriber<T, U> implements Observer<T>, Disposable {

Disposable s;

boolean done;

public CollectSubscriber(Observer<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
this.actual = actual;
this.collector = collector;
Expand Down Expand Up @@ -84,21 +87,33 @@ public boolean isDisposed() {

@Override
public void onNext(T t) {
if (done) {
return;
}
try {
collector.accept(u, t);
} catch (Throwable e) {
s.dispose();
actual.onError(e);
onError(e);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onNext(u);
actual.onComplete();
}
Expand Down
36 changes: 4 additions & 32 deletions src/test/java/io/reactivex/flowable/FlowableCollectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package io.reactivex.flowable;

import static io.reactivex.internal.util.TestingHelper.addToList;
import static io.reactivex.internal.util.TestingHelper.biConsumerThrows;
import static io.reactivex.internal.util.TestingHelper.callableListCreator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

Expand All @@ -27,10 +30,9 @@

import io.reactivex.Flowable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;

public class FlowableCollectTest {
public final class FlowableCollectTest {

@Test
public void testCollectToList() {
Expand Down Expand Up @@ -165,34 +167,4 @@ public void accept(Object o, Integer t) {
assertFalse(added.get());
}

private static Consumer<Throwable> addToList(final List<Throwable> list) {
return new Consumer<Throwable>() {

@Override
public void accept(Throwable t) {
list.add(t);
}
};
}

private static <T> Callable<List<T>> callableListCreator() {
return new Callable<List<T>>() {

@Override
public List<T> call() {
return new ArrayList<T>();
}
};
}

private static <T> BiConsumer<Object, T> biConsumerThrows(final RuntimeException e) {
return new BiConsumer<Object, T>() {

@Override
public void accept(Object t1, T t2) {
throw e;
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;

import java.util.Arrays;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposables;

/**
* Creates {@link Observable} of a number of items followed by either an error or
* completion. Subscription status is not checked before emitting an event.
*
* @param <T> the value type
*/
public final class Burst<T> extends Observable<T> {

private final List<T> items;
private final Throwable error;

private Burst(Throwable error, List<T> items) {
this.error = error;
this.items = items;
}

@Override
protected void subscribeActual(final Observer<? super T> observer) {
observer.onSubscribe(Disposables.empty());
for (T item: items) {
observer.onNext(item);
}
if (error != null) {
observer.onError(error);
} else {
observer.onComplete();
}
}

@SuppressWarnings("unchecked")
public static <T> Builder<T> item(T item) {
return items(item);
}

public static <T> Builder<T> items(T... items) {
return new Builder<T>(Arrays.asList(items));
}

public static final class Builder<T> {

private final List<T> items;
private Throwable error = null;

private Builder(List<T> items) {
this.items = items;
}

public Observable<T> error(Throwable e) {
this.error = e;
return create();
}

public Observable<T> create() {
return new Burst<T>(error, items);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.reactivex.internal.operators.observable;

import static io.reactivex.internal.util.TestingHelper.addToList;
import static io.reactivex.internal.util.TestingHelper.biConsumerThrows;
import static io.reactivex.internal.util.TestingHelper.callableListCreator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import io.reactivex.Observable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableCollectTest {

@Test
public void testCollectToList() {
Observable<List<Integer>> o = Observable.just(1, 2, 3)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer v) {
list.add(v);
}
});

List<Integer> list = o.blockingLast();

assertEquals(3, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(2, list.get(1).intValue());
assertEquals(3, list.get(2).intValue());

// test multiple subscribe
List<Integer> list2 = o.blockingLast();

assertEquals(3, list2.size());
assertEquals(1, list2.get(0).intValue());
assertEquals(2, list2.get(1).intValue());
assertEquals(3, list2.get(2).intValue());
}

@Test
public void testCollectToString() {
String value = Observable.just(1, 2, 3).collect(new Callable<StringBuilder>() {
@Override
public StringBuilder call() {
return new StringBuilder();
}
},
new BiConsumer<StringBuilder, Integer>() {
@Override
public void accept(StringBuilder sb, Integer v) {
if (sb.length() > 0) {
sb.append("-");
}
sb.append(v);
}
}).blockingLast().toString();

assertEquals("1-2-3", value);
}

@Test
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
try {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
RxJavaPlugins.setErrorHandler(addToList(list));
final RuntimeException e1 = new RuntimeException();
final RuntimeException e2 = new RuntimeException();

Burst.items(1).error(e2) //
.collect(callableListCreator(), biConsumerThrows(e1)) //
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(Arrays.asList(e2), list);
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create() //
.collect(callableListCreator(), biConsumerThrows(e)) //
.test() //
.assertError(e) //
.assertNotComplete();
}

@Test
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
final RuntimeException e = new RuntimeException();
final AtomicBoolean added = new AtomicBoolean();
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {

boolean once = true;

@Override
public void accept(Object o, Integer t) {
if (once) {
once = false;
throw e;
} else {
added.set(true);
}
}
};
Burst.items(1, 2).create() //
.collect(callableListCreator(), throwOnFirstOnly)//
.test() //
.assertError(e) //
.assertNoValues() //
.assertNotComplete();
assertFalse(added.get());
}

}
60 changes: 60 additions & 0 deletions src/test/java/io/reactivex/internal/util/TestingHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.reactivex.internal.util;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;

public final class TestingHelper {

private TestingHelper() {
// prevent instantiation
}

public static <T> Consumer<T> addToList(final List<T> list) {
return new Consumer<T>() {

@Override
public void accept(T t) {
list.add(t);
}
};
}

public static <T> Callable<List<T>> callableListCreator() {
return new Callable<List<T>>() {

@Override
public List<T> call() {
return new ArrayList<T>();
}
};
}

public static BiConsumer<Object, Object> biConsumerThrows(final RuntimeException e) {
return new BiConsumer<Object, Object>() {

@Override
public void accept(Object t1, Object t2) {
throw e;
}
};
}
}
Loading