Skip to content

materialize() - add backpressure support #3103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 116 additions & 17 deletions src/main/java/rx/internal/operators/OperatorMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Notification;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.plugins.RxJavaPlugins;

Expand All @@ -29,41 +32,137 @@
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">here</a> for the Microsoft Rx equivalent.
*/
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {

/** Lazy initialization via inner-class holder. */
private static final class Holder {
/** A singleton instance. */
static final OperatorMaterialize<Object> INSTANCE = new OperatorMaterialize<Object>();
}

/**
* @return a singleton instance of this stateless operator.
*/
@SuppressWarnings("unchecked")
public static <T> OperatorMaterialize<T> instance() {
return (OperatorMaterialize<T>)Holder.INSTANCE;
return (OperatorMaterialize<T>) Holder.INSTANCE;
}
private OperatorMaterialize() { }

private OperatorMaterialize() {
}

@Override
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
return new Subscriber<T>(child) {

final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
child.add(parent);
child.setProducer(new Producer() {
@Override
public void onCompleted() {
child.onNext(Notification.<T> createOnCompleted());
child.onCompleted();
public void request(long n) {
if (n > 0) {
parent.requestMore(n);
}
}
});
return parent;
}

@Override
public void onError(Throwable e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
child.onNext(Notification.<T> createOnError(e));
child.onCompleted();
}
private static class ParentSubscriber<T> extends Subscriber<T> {

@Override
public void onNext(T t) {
child.onNext(Notification.<T> createOnNext(t));
private final Subscriber<? super Notification<T>> child;

private volatile Notification<T> terminalNotification;

// guarded by this
private boolean busy = false;
// guarded by this
private boolean missed = false;

private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ParentSubscriber> REQUESTED = AtomicLongFieldUpdater
.newUpdater(ParentSubscriber.class, "requested");

ParentSubscriber(Subscriber<? super Notification<T>> child) {
this.child = child;
}

@Override
public void onStart() {
request(0);
}

void requestMore(long n) {
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
request(n);
drain();
}

@Override
public void onCompleted() {
terminalNotification = Notification.createOnCompleted();
drain();
}

@Override
public void onError(Throwable e) {
terminalNotification = Notification.createOnError(e);
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
drain();
}

@Override
public void onNext(T t) {
child.onNext(Notification.createOnNext(t));
decrementRequested();
}

private void decrementRequested() {
// atomically decrement requested
while (true) {
long r = requested;
if (r == Long.MAX_VALUE) {
// don't decrement if unlimited requested
return;
} else if (REQUESTED.compareAndSet(this, r, r - 1)) {
return;
}
}
}

};
private void drain() {
synchronized (this) {
if (busy) {
// set flag to force extra loop if drain loop running
missed = true;
return;
}
}
// drain loop
while (!child.isUnsubscribed()) {
Notification<T> tn;
tn = terminalNotification;
if (tn != null) {
if (requested > 0) {
// allow tn to be GC'd after the onNext call
terminalNotification = null;
// emit the terminal notification
child.onNext(tn);
if (!child.isUnsubscribed()) {
child.onCompleted();
}
// note that we leave busy=true here
// which will prevent further drains
return;
}
}
// continue looping if drain() was called while in
// this loop
synchronized (this) {
if (!missed) {
busy = false;
return;
}
}
}
}
}
}
114 changes: 111 additions & 3 deletions src/test/java/rx/internal/operators/OperatorMaterializeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
Expand All @@ -28,13 +29,18 @@
import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorMaterializeTest {

@Test
public void testMaterialize1() {
// null will cause onError to be triggered before "three" can be returned
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
// null will cause onError to be triggered before "three" can be
// returned
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
"three");

TestObserver Observer = new TestObserver();
Observable<Notification<String>> m = Observable.create(o1).materialize();
Expand All @@ -53,7 +59,8 @@ public void testMaterialize1() {
assertTrue(Observer.notifications.get(0).isOnNext());
assertEquals("two", Observer.notifications.get(1).getValue());
assertTrue(Observer.notifications.get(1).isOnNext());
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable().getClass());
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable()
.getClass());
assertTrue(Observer.notifications.get(2).isOnError());
}

Expand Down Expand Up @@ -93,6 +100,107 @@ public void testMultipleSubscribes() throws InterruptedException, ExecutionExcep
assertEquals(3, m.toList().toBlocking().toFuture().get().size());
}

@Test
public void testBackpressureOnEmptyStream() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
Observable.<Integer> empty().materialize().subscribe(ts);
ts.assertNoValues();
ts.requestMore(1);
ts.assertValueCount(1);
assertTrue(ts.getOnNextEvents().get(0).isOnCompleted());
ts.assertCompleted();
}

@Test
public void testBackpressureNoError() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
Observable.just(1, 2, 3).materialize().subscribe(ts);
ts.assertNoValues();
ts.requestMore(1);
ts.assertValueCount(1);
ts.requestMore(2);
ts.assertValueCount(3);
ts.requestMore(1);
ts.assertValueCount(4);
ts.assertCompleted();
}

@Test
public void testBackpressureNoErrorAsync() throws InterruptedException {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
Observable.just(1, 2, 3)
.materialize()
.subscribeOn(Schedulers.computation())
.subscribe(ts);
Thread.sleep(100);
ts.assertNoValues();
ts.requestMore(1);
Thread.sleep(100);
ts.assertValueCount(1);
ts.requestMore(2);
Thread.sleep(100);
ts.assertValueCount(3);
ts.requestMore(1);
Thread.sleep(100);
ts.assertValueCount(4);
ts.assertCompleted();
}

@Test
public void testBackpressureWithError() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
Observable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
ts.assertNoValues();
ts.requestMore(1);
ts.assertValueCount(1);
ts.assertCompleted();
}

@Test
public void testBackpressureWithEmissionThenError() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
IllegalArgumentException ex = new IllegalArgumentException();
Observable.from(Arrays.asList(1)).concatWith(Observable.<Integer> error(ex)).materialize()
.subscribe(ts);
ts.assertNoValues();
ts.requestMore(1);
ts.assertValueCount(1);
assertTrue(ts.getOnNextEvents().get(0).hasValue());
ts.requestMore(1);
ts.assertValueCount(2);
assertTrue(ts.getOnNextEvents().get(1).isOnError());
assertTrue(ex == ts.getOnNextEvents().get(1).getThrowable());
ts.assertCompleted();
}

@Test
public void testWithCompletionCausingError() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create();
final RuntimeException ex = new RuntimeException("boo");
Observable.<Integer>empty().materialize().doOnNext(new Action1<Object>() {
@Override
public void call(Object t) {
throw ex;
}
}).subscribe(ts);
ts.assertError(ex);
ts.assertNoValues();
ts.assertTerminalEvent();
}

@Test
public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
IllegalArgumentException ex = new IllegalArgumentException();
Observable.<Integer>empty().materialize()
.subscribe(ts);
ts.assertNoValues();
ts.unsubscribe();
ts.requestMore(1);
ts.assertNoValues();
ts.assertUnsubscribed();
}

private static class TestObserver extends Subscriber<Notification<String>> {

boolean onCompleted = false;
Expand Down