Skip to content

Commit 7718dc8

Browse files
committed
add backpressure to OperatorMaterialize
1 parent 1210182 commit 7718dc8

File tree

2 files changed

+236
-19
lines changed

2 files changed

+236
-19
lines changed

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
1820
import rx.Notification;
1921
import rx.Observable.Operator;
22+
import rx.Producer;
2023
import rx.Subscriber;
2124
import rx.plugins.RxJavaPlugins;
2225

@@ -29,41 +32,147 @@
2932
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">here</a> for the Microsoft Rx equivalent.
3033
*/
3134
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {
35+
3236
/** Lazy initialization via inner-class holder. */
3337
private static final class Holder {
3438
/** A singleton instance. */
3539
static final OperatorMaterialize<Object> INSTANCE = new OperatorMaterialize<Object>();
3640
}
41+
3742
/**
3843
* @return a singleton instance of this stateless operator.
3944
*/
4045
@SuppressWarnings("unchecked")
4146
public static <T> OperatorMaterialize<T> instance() {
42-
return (OperatorMaterialize<T>)Holder.INSTANCE;
47+
return (OperatorMaterialize<T>) Holder.INSTANCE;
4348
}
44-
private OperatorMaterialize() { }
49+
50+
private OperatorMaterialize() {
51+
}
52+
4553
@Override
4654
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
47-
return new Subscriber<T>(child) {
48-
55+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
56+
child.add(parent);
57+
child.setProducer(new Producer() {
4958
@Override
50-
public void onCompleted() {
51-
child.onNext(Notification.<T> createOnCompleted());
52-
child.onCompleted();
59+
public void request(long n) {
60+
if (n > 0) {
61+
parent.requestMore(n);
62+
}
5363
}
64+
});
65+
return parent;
66+
}
5467

55-
@Override
56-
public void onError(Throwable e) {
57-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
58-
child.onNext(Notification.<T> createOnError(e));
59-
child.onCompleted();
68+
private static class ParentSubscriber<T> extends Subscriber<T> {
69+
70+
private final Subscriber<? super Notification<T>> child;
71+
72+
// guarded by this
73+
private Notification<T> terminalNotification;
74+
// guarded by this
75+
private boolean busy = false;
76+
// guarded by this
77+
private boolean missed = false;
78+
79+
private volatile long requested;
80+
@SuppressWarnings("rawtypes")
81+
private static final AtomicLongFieldUpdater<ParentSubscriber> REQUESTED = AtomicLongFieldUpdater
82+
.newUpdater(ParentSubscriber.class, "requested");
83+
84+
ParentSubscriber(Subscriber<? super Notification<T>> child) {
85+
this.child = child;
86+
}
87+
88+
@Override
89+
public void onStart() {
90+
request(0);
91+
}
92+
93+
void requestMore(long n) {
94+
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
95+
request(n);
96+
drain();
97+
}
98+
99+
@Override
100+
public void onCompleted() {
101+
synchronized (this) {
102+
terminalNotification = Notification.createOnCompleted();
60103
}
104+
drain();
105+
}
61106

62-
@Override
63-
public void onNext(T t) {
64-
child.onNext(Notification.<T> createOnNext(t));
107+
@Override
108+
public void onError(Throwable e) {
109+
synchronized (this) {
110+
terminalNotification = Notification.createOnError(e);
65111
}
112+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
113+
drain();
114+
}
66115

67-
};
116+
@Override
117+
public void onNext(T t) {
118+
child.onNext(Notification.createOnNext(t));
119+
decrementRequested();
120+
}
121+
122+
private void decrementRequested() {
123+
// atomically decrement requested
124+
while (true) {
125+
long r = requested;
126+
if (r == Long.MAX_VALUE) {
127+
// don't decrement if unlimited requested
128+
return;
129+
} else if (REQUESTED.compareAndSet(this, r, r - 1)) {
130+
return;
131+
}
132+
}
133+
}
134+
135+
private void drain() {
136+
synchronized (this) {
137+
if (busy) {
138+
// set flag to force extra loop if drain loop running
139+
missed = true;
140+
return;
141+
} else {
142+
// is our turn to run drain loop so set missed to false
143+
// so we can detect missed drain calls
144+
missed = false;
145+
}
146+
}
147+
// drain loop
148+
while (!child.isUnsubscribed()) {
149+
Notification<T> tn;
150+
synchronized (this) {
151+
tn = terminalNotification;
152+
}
153+
if (tn != null) {
154+
if (requested > 0) {
155+
// allow tn to be GC'd after the onNext call
156+
terminalNotification = null;
157+
// emit the terminal notification
158+
child.onNext(tn);
159+
if (!child.isUnsubscribed()) {
160+
child.onCompleted();
161+
}
162+
// note that we leave busy=true here
163+
// which will prevent further drains
164+
return;
165+
}
166+
}
167+
// continue looping if drain() was called while in
168+
// this loop
169+
synchronized (this) {
170+
if (!missed) {
171+
busy = false;
172+
return;
173+
}
174+
}
175+
}
176+
}
68177
}
69178
}

src/test/java/rx/internal/operators/OperatorMaterializeTest.java

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertFalse;
2020
import static org.junit.Assert.assertTrue;
2121

22+
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.Vector;
2425
import java.util.concurrent.ExecutionException;
@@ -28,13 +29,18 @@
2829
import rx.Notification;
2930
import rx.Observable;
3031
import rx.Subscriber;
32+
import rx.functions.Action1;
33+
import rx.observers.TestSubscriber;
34+
import rx.schedulers.Schedulers;
3135

3236
public class OperatorMaterializeTest {
3337

3438
@Test
3539
public void testMaterialize1() {
36-
// null will cause onError to be triggered before "three" can be returned
37-
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
40+
// null will cause onError to be triggered before "three" can be
41+
// returned
42+
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null,
43+
"three");
3844

3945
TestObserver Observer = new TestObserver();
4046
Observable<Notification<String>> m = Observable.create(o1).materialize();
@@ -53,7 +59,8 @@ public void testMaterialize1() {
5359
assertTrue(Observer.notifications.get(0).isOnNext());
5460
assertEquals("two", Observer.notifications.get(1).getValue());
5561
assertTrue(Observer.notifications.get(1).isOnNext());
56-
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable().getClass());
62+
assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable()
63+
.getClass());
5764
assertTrue(Observer.notifications.get(2).isOnError());
5865
}
5966

@@ -93,6 +100,107 @@ public void testMultipleSubscribes() throws InterruptedException, ExecutionExcep
93100
assertEquals(3, m.toList().toBlocking().toFuture().get().size());
94101
}
95102

103+
@Test
104+
public void testBackpressureOnEmptyStream() {
105+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
106+
Observable.<Integer> empty().materialize().subscribe(ts);
107+
ts.assertNoValues();
108+
ts.requestMore(1);
109+
ts.assertValueCount(1);
110+
assertTrue(ts.getOnNextEvents().get(0).isOnCompleted());
111+
ts.assertCompleted();
112+
}
113+
114+
@Test
115+
public void testBackpressureNoError() {
116+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
117+
Observable.just(1, 2, 3).materialize().subscribe(ts);
118+
ts.assertNoValues();
119+
ts.requestMore(1);
120+
ts.assertValueCount(1);
121+
ts.requestMore(2);
122+
ts.assertValueCount(3);
123+
ts.requestMore(1);
124+
ts.assertValueCount(4);
125+
ts.assertCompleted();
126+
}
127+
128+
@Test
129+
public void testBackpressureNoErrorAsync() throws InterruptedException {
130+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
131+
Observable.just(1, 2, 3)
132+
.materialize()
133+
.subscribeOn(Schedulers.computation())
134+
.subscribe(ts);
135+
Thread.sleep(100);
136+
ts.assertNoValues();
137+
ts.requestMore(1);
138+
Thread.sleep(100);
139+
ts.assertValueCount(1);
140+
ts.requestMore(2);
141+
Thread.sleep(100);
142+
ts.assertValueCount(3);
143+
ts.requestMore(1);
144+
Thread.sleep(100);
145+
ts.assertValueCount(4);
146+
ts.assertCompleted();
147+
}
148+
149+
@Test
150+
public void testBackpressureWithError() {
151+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
152+
Observable.<Integer> error(new IllegalArgumentException()).materialize().subscribe(ts);
153+
ts.assertNoValues();
154+
ts.requestMore(1);
155+
ts.assertValueCount(1);
156+
ts.assertCompleted();
157+
}
158+
159+
@Test
160+
public void testBackpressureWithEmissionThenError() {
161+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
162+
IllegalArgumentException ex = new IllegalArgumentException();
163+
Observable.from(Arrays.asList(1)).concatWith(Observable.<Integer> error(ex)).materialize()
164+
.subscribe(ts);
165+
ts.assertNoValues();
166+
ts.requestMore(1);
167+
ts.assertValueCount(1);
168+
assertTrue(ts.getOnNextEvents().get(0).hasValue());
169+
ts.requestMore(1);
170+
ts.assertValueCount(2);
171+
assertTrue(ts.getOnNextEvents().get(1).isOnError());
172+
assertTrue(ex == ts.getOnNextEvents().get(1).getThrowable());
173+
ts.assertCompleted();
174+
}
175+
176+
@Test
177+
public void testWithCompletionCausingError() {
178+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create();
179+
final RuntimeException ex = new RuntimeException("boo");
180+
Observable.<Integer>empty().materialize().doOnNext(new Action1<Object>() {
181+
@Override
182+
public void call(Object t) {
183+
throw ex;
184+
}
185+
}).subscribe(ts);
186+
ts.assertError(ex);
187+
ts.assertNoValues();
188+
ts.assertTerminalEvent();
189+
}
190+
191+
@Test
192+
public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() {
193+
TestSubscriber<Notification<Integer>> ts = TestSubscriber.create(0);
194+
IllegalArgumentException ex = new IllegalArgumentException();
195+
Observable.<Integer>empty().materialize()
196+
.subscribe(ts);
197+
ts.assertNoValues();
198+
ts.unsubscribe();
199+
ts.requestMore(1);
200+
ts.assertNoValues();
201+
ts.assertUnsubscribed();
202+
}
203+
96204
private static class TestObserver extends Subscriber<Notification<String>> {
97205

98206
boolean onCompleted = false;

0 commit comments

Comments
 (0)