Skip to content

Commit cc97739

Browse files
committed
add backpressure to OperatorMaterialize
1 parent 1210182 commit cc97739

File tree

2 files changed

+227
-20
lines changed

2 files changed

+227
-20
lines changed

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 116 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
1820
import rx.Notification;
1921
import rx.Observable.Operator;
22+
import rx.Producer;
2023
import rx.Subscriber;
2124
import rx.plugins.RxJavaPlugins;
2225

@@ -29,41 +32,137 @@
2932
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">here</a> for the Microsoft Rx equivalent.
3033
*/
3134
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {
35+
3236
/** Lazy initialization via inner-class holder. */
3337
private static final class Holder {
3438
/** A singleton instance. */
3539
static final OperatorMaterialize<Object> INSTANCE = new OperatorMaterialize<Object>();
3640
}
41+
3742
/**
3843
* @return a singleton instance of this stateless operator.
3944
*/
4045
@SuppressWarnings("unchecked")
4146
public static <T> OperatorMaterialize<T> instance() {
42-
return (OperatorMaterialize<T>)Holder.INSTANCE;
47+
return (OperatorMaterialize<T>) Holder.INSTANCE;
4348
}
44-
private OperatorMaterialize() { }
49+
50+
private OperatorMaterialize() {
51+
}
52+
4553
@Override
4654
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
47-
return new Subscriber<T>(child) {
48-
55+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
56+
child.add(parent);
57+
child.setProducer(new Producer() {
4958
@Override
50-
public void onCompleted() {
51-
child.onNext(Notification.<T> createOnCompleted());
52-
child.onCompleted();
59+
public void request(long n) {
60+
if (n > 0) {
61+
parent.requestMore(n);
62+
}
5363
}
64+
});
65+
return parent;
66+
}
5467

55-
@Override
56-
public void onError(Throwable e) {
57-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
58-
child.onNext(Notification.<T> createOnError(e));
59-
child.onCompleted();
60-
}
68+
private static class ParentSubscriber<T> extends Subscriber<T> {
6169

62-
@Override
63-
public void onNext(T t) {
64-
child.onNext(Notification.<T> createOnNext(t));
70+
private final Subscriber<? super Notification<T>> child;
71+
72+
private volatile Notification<T> terminalNotification;
73+
74+
// guarded by this
75+
private boolean busy = false;
76+
// guarded by this
77+
private boolean missed = false;
78+
79+
private volatile long requested;
80+
@SuppressWarnings("rawtypes")
81+
private static final AtomicLongFieldUpdater<ParentSubscriber> REQUESTED = AtomicLongFieldUpdater
82+
.newUpdater(ParentSubscriber.class, "requested");
83+
84+
ParentSubscriber(Subscriber<? super Notification<T>> child) {
85+
this.child = child;
86+
}
87+
88+
@Override
89+
public void onStart() {
90+
request(0);
91+
}
92+
93+
void requestMore(long n) {
94+
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
95+
request(n);
96+
drain();
97+
}
98+
99+
@Override
100+
public void onCompleted() {
101+
terminalNotification = Notification.createOnCompleted();
102+
drain();
103+
}
104+
105+
@Override
106+
public void onError(Throwable e) {
107+
terminalNotification = Notification.createOnError(e);
108+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
109+
drain();
110+
}
111+
112+
@Override
113+
public void onNext(T t) {
114+
child.onNext(Notification.createOnNext(t));
115+
decrementRequested();
116+
}
117+
118+
private void decrementRequested() {
119+
// atomically decrement requested
120+
while (true) {
121+
long r = requested;
122+
if (r == Long.MAX_VALUE) {
123+
// don't decrement if unlimited requested
124+
return;
125+
} else if (REQUESTED.compareAndSet(this, r, r - 1)) {
126+
return;
127+
}
65128
}
129+
}
66130

67-
};
131+
private void drain() {
132+
synchronized (this) {
133+
if (busy) {
134+
// set flag to force extra loop if drain loop running
135+
missed = true;
136+
return;
137+
}
138+
}
139+
// drain loop
140+
while (!child.isUnsubscribed()) {
141+
Notification<T> tn;
142+
tn = terminalNotification;
143+
if (tn != null) {
144+
if (requested > 0) {
145+
// allow tn to be GC'd after the onNext call
146+
terminalNotification = null;
147+
// emit the terminal notification
148+
child.onNext(tn);
149+
if (!child.isUnsubscribed()) {
150+
child.onCompleted();
151+
}
152+
// note that we leave busy=true here
153+
// which will prevent further drains
154+
return;
155+
}
156+
}
157+
// continue looping if drain() was called while in
158+
// this loop
159+
synchronized (this) {
160+
if (!missed) {
161+
busy = false;
162+
return;
163+
}
164+
}
165+
}
166+
}
68167
}
69168
}

src/test/java/rx/internal/operators/OperatorMaterializeTest.java

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertFalse;
2020
import static org.junit.Assert.assertTrue;
2121

22+
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.Vector;
2425
import java.util.concurrent.ExecutionException;
@@ -28,13 +29,18 @@
2829
import rx.Notification;
2930
import rx.Observable;
3031
import rx.Subscriber;
32+
import rx.functions.Action1;
33+
import rx.observers.TestSubscriber;
34+
import rx.schedulers.Schedulers;
3135

3236
public class OperatorMaterializeTest {
3337

3438
@Test
3539
public void testMaterialize1() {
36-
// null will cause onError to be triggered before "three" can be returned
37-
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
40+
// null will cause onError to be triggered before "three" can be
41+
// returned
42+
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
43+
"three");
3844

3945
TestObserver Observer = new TestObserver();
4046
Observable<Notification<String>> m = Observable.create(o1).materialize();
@@ -53,7 +59,8 @@ public void testMaterialize1() {
5359
assertTrue(Observer.notifications.get(0).isOnNext());
5460
assertEquals("two", Observer.notifications.get(1).getValue());
5561
assertTrue(Observer.notifications.get(1).isOnNext());
56-
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable().getClass());
62+
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable()
63+
.getClass());
5764
assertTrue(Observer.notifications.get(2).isOnError());
5865
}
5966

@@ -93,6 +100,107 @@ public void testMultipleSubscribes() throws InterruptedException, ExecutionExcep
93100
assertEquals(3, m.toList().toBlocking().toFuture().get().size());
94101
}
95102

103+
@Test
104+
public void testBackpressureOnEmptyStream() {
105+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
106+
Observable.<Integer> empty().materialize().subscribe(ts);
107+
ts.assertNoValues();
108+
ts.requestMore(1);
109+
ts.assertValueCount(1);
110+
assertTrue(ts.getOnNextEvents().get(0).isOnCompleted());
111+
ts.assertCompleted();
112+
}
113+
114+
@Test
115+
public void testBackpressureNoError() {
116+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
117+
Observable.just(1, 2, 3).materialize().subscribe(ts);
118+
ts.assertNoValues();
119+
ts.requestMore(1);
120+
ts.assertValueCount(1);
121+
ts.requestMore(2);
122+
ts.assertValueCount(3);
123+
ts.requestMore(1);
124+
ts.assertValueCount(4);
125+
ts.assertCompleted();
126+
}
127+
128+
@Test
129+
public void testBackpressureNoErrorAsync() throws InterruptedException {
130+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
131+
Observable.just(1, 2, 3)
132+
.materialize()
133+
.subscribeOn(Schedulers.computation())
134+
.subscribe(ts);
135+
Thread.sleep(100);
136+
ts.assertNoValues();
137+
ts.requestMore(1);
138+
Thread.sleep(100);
139+
ts.assertValueCount(1);
140+
ts.requestMore(2);
141+
Thread.sleep(100);
142+
ts.assertValueCount(3);
143+
ts.requestMore(1);
144+
Thread.sleep(100);
145+
ts.assertValueCount(4);
146+
ts.assertCompleted();
147+
}
148+
149+
@Test
150+
public void testBackpressureWithError() {
151+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
152+
Observable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
153+
ts.assertNoValues();
154+
ts.requestMore(1);
155+
ts.assertValueCount(1);
156+
ts.assertCompleted();
157+
}
158+
159+
@Test
160+
public void testBackpressureWithEmissionThenError() {
161+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
162+
IllegalArgumentException ex = new IllegalArgumentException();
163+
Observable.from(Arrays.asList(1)).concatWith(Observable.<Integer> error(ex)).materialize()
164+
.subscribe(ts);
165+
ts.assertNoValues();
166+
ts.requestMore(1);
167+
ts.assertValueCount(1);
168+
assertTrue(ts.getOnNextEvents().get(0).hasValue());
169+
ts.requestMore(1);
170+
ts.assertValueCount(2);
171+
assertTrue(ts.getOnNextEvents().get(1).isOnError());
172+
assertTrue(ex == ts.getOnNextEvents().get(1).getThrowable());
173+
ts.assertCompleted();
174+
}
175+
176+
@Test
177+
public void testWithCompletionCausingError() {
178+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create();
179+
final RuntimeException ex = new RuntimeException("boo");
180+
Observable.<Integer>empty().materialize().doOnNext(new Action1<Object>() {
181+
@Override
182+
public void call(Object t) {
183+
throw ex;
184+
}
185+
}).subscribe(ts);
186+
ts.assertError(ex);
187+
ts.assertNoValues();
188+
ts.assertTerminalEvent();
189+
}
190+
191+
@Test
192+
public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
193+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
194+
IllegalArgumentException ex = new IllegalArgumentException();
195+
Observable.<Integer>empty().materialize()
196+
.subscribe(ts);
197+
ts.assertNoValues();
198+
ts.unsubscribe();
199+
ts.requestMore(1);
200+
ts.assertNoValues();
201+
ts.assertUnsubscribed();
202+
}
203+
96204
private static class TestObserver extends Subscriber<Notification<String>> {
97205

98206
boolean onCompleted = false;

0 commit comments

Comments
 (0)