Skip to content

2.x collect - handle post terminal events #4364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {

Expand Down Expand Up @@ -58,6 +59,8 @@ static final class CollectSubscriber<T, U> extends DeferredScalarSubscription<U>

Subscription s;

boolean done;

public CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
super(actual);
this.collector = collector;
Expand All @@ -75,22 +78,34 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (done) {
return;
}
try {
collector.accept(u, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.cancel();
actual.onError(e);
onError(e);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(u);
}

Expand Down
124 changes: 124 additions & 0 deletions src/test/java/io/reactivex/flowable/Burst.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.flowable;

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;

/**
* Creates {@link Flowable} of a number of items followed by either an error or
* completion. Cancellation has no effect on preventing emissions until the
* currently outstanding requests have been met.
*/
public final class Burst<T> extends Flowable<T> {

private final List<T> items;
private final Throwable error;

private Burst(Throwable error, List<T> items) {
if (items.isEmpty()) {
throw new IllegalArgumentException("items cannot be empty");
}
for (T item : items) {
if (item == null) {
throw new IllegalArgumentException("items cannot include null");
}
}
this.error = error;
this.items = items;
}

@Override
protected void subscribeActual(final Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new Subscription() {

final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled = false;

@Override
public void request(long n) {
if (cancelled) {
// required by reactive-streams-jvm 3.6
return;
}
if (SubscriptionHelper.validate(n)) {
// just for testing, don't care about perf
// so no attempt made to reduce volatile reads
if (BackpressureHelper.add(requested, n) == 0) {
if (q.isEmpty())
return;
while (!q.isEmpty() && requested.get() > 0) {
T item = q.poll();
requested.decrementAndGet();
subscriber.onNext(item);
}
if (q.isEmpty()) {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
}
}
}
}

@Override
public void cancel() {
cancelled = true;
}
});

}

@SuppressWarnings("unchecked")
public static <T> Builder<T> item(T item) {
return items(item);
}

public static <T> Builder<T> items(T... items) {
return new Builder<T>(Arrays.asList(items));
}

public static final class Builder<T> {

private final List<T> items;
private Throwable error = null;

private Builder(List<T> items) {
this.items = items;
}

public Flowable<T> error(Throwable e) {
this.error = e;
return create();
}

public Flowable<T> create() {
return new Burst<T>(error, items);
}

}

}
185 changes: 185 additions & 0 deletions src/test/java/io/reactivex/flowable/FlowableCollectTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package io.reactivex.flowable;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;

public class FlowableCollectTest {

@Test
public void testCollectToList() {
Flowable<List<Integer>> o = Flowable.just(1, 2, 3)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer v) {
list.add(v);
}
});

List<Integer> list = o.blockingLast();

assertEquals(3, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(2, list.get(1).intValue());
assertEquals(3, list.get(2).intValue());

// test multiple subscribe
List<Integer> list2 = o.blockingLast();

assertEquals(3, list2.size());
assertEquals(1, list2.get(0).intValue());
assertEquals(2, list2.get(1).intValue());
assertEquals(3, list2.get(2).intValue());
}

@Test
public void testCollectToString() {
String value = Flowable.just(1, 2, 3)
.collect(
new Callable<StringBuilder>() {
@Override
public StringBuilder call() {
return new StringBuilder();
}
},
new BiConsumer<StringBuilder, Integer>() {
@Override
public void accept(StringBuilder sb, Integer v) {
if (sb.length() > 0) {
sb.append("-");
}
sb.append(v);
}
}).blockingLast().toString();

assertEquals("1-2-3", value);
}


@Test
public void testFactoryFailureResultsInErrorEmission() {
final RuntimeException e = new RuntimeException();
Flowable.just(1).collect(new Callable<List<Integer>>() {

@Override
public List<Integer> call() throws Exception {
throw e;
}
}, new BiConsumer<List<Integer>, Integer>() {

@Override
public void accept(List<Integer> list, Integer t) {
list.add(t);
}
})
.test()
.assertNoValues()
.assertError(e)
.assertNotComplete();
}

@Test
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
try {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
RxJavaPlugins.setErrorHandler(addToList(list));
final RuntimeException e1 = new RuntimeException();
final RuntimeException e2 = new RuntimeException();

Burst.items(1).error(e2) //
.collect(callableListCreator(), biConsumerThrows(e1)) //
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(Arrays.asList(e2), list);
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create() //
.collect(callableListCreator(), biConsumerThrows(e)) //
.test() //
.assertError(e) //
.assertNotComplete();
}

@Test
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
final RuntimeException e = new RuntimeException();
final AtomicBoolean added = new AtomicBoolean();
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {

boolean once = true;

@Override
public void accept(Object o, Integer t) {
if (once) {
once = false;
throw e;
} else {
added.set(true);
}
}
};
Burst.items(1, 2).create() //
.collect(callableListCreator(), throwOnFirstOnly)//
.test() //
.assertError(e) //
.assertNoValues() //
.assertNotComplete();
assertFalse(added.get());
}

private static Consumer<Throwable> addToList(final List<Throwable> list) {
return new Consumer<Throwable>() {

@Override
public void accept(Throwable t) {
list.add(t);
}
};
}

private static <T> Callable<List<T>> callableListCreator() {
return new Callable<List<T>>() {

@Override
public List<T> call() {
return new ArrayList<T>();
}
};
}

private static <T> BiConsumer<Object, T> biConsumerThrows(final RuntimeException e) {
return new BiConsumer<Object, T>() {

@Override
public void accept(Object t1, T t2) {
throw e;
}
};
}

}
Loading