Skip to content

Commit f38eb99

Browse files
authored
2.x: add strict() operator for strong RS conformance (#4966)
* 2.x: add strict() operator for strong RS conformance * Fix javadoc and annotation
1 parent b917754 commit f38eb99

File tree

7 files changed

+394
-218
lines changed

7 files changed

+394
-218
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12509,6 +12509,35 @@ public final Flowable<T> startWithArray(T... items) {
1250912509
return concatArray(fromArray, this);
1251012510
}
1251112511

12512+
/**
12513+
* Ensures that the event flow between the upstream and downstream follow
12514+
* the Reactive-Streams 1.0 specification by honoring the 3 additional rules
12515+
* (which are omitted in standard operators due to performance reasons).
12516+
* <ul>
12517+
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns</li>
12518+
* <li>§2.3: onError or onComplete must not call cancel</li>
12519+
* <li>§3.9: negative requests should emit an onError(IllegalArgumentException)</li>
12520+
* </ul>
12521+
* In addition, if rule §2.12 (onSubscribe must be called at most once) is violated,
12522+
* the sequence is cancelled an onError(IllegalStateException) is emitted.
12523+
* <dl>
12524+
* <dt><b>Backpressure:</b></dt>
12525+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
12526+
* behavior.</dd>
12527+
* <dt><b>Scheduler:</b></dt>
12528+
* <dd>{@code strict} does not operate by default on a particular {@link Scheduler}.</dd>
12529+
* </dl>
12530+
* @return the new Flowable instance
12531+
* @since 2.0.5 - experimental
12532+
*/
12533+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
12534+
@SchedulerSupport(SchedulerSupport.NONE)
12535+
@Experimental
12536+
@CheckReturnValue
12537+
public final Flowable<T> strict() {
12538+
return RxJavaPlugins.onAssembly(new FlowableStrict<T>(this));
12539+
}
12540+
1251212541
/**
1251312542
* Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions.
1251412543
* <p>
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
21+
import io.reactivex.internal.util.*;
22+
23+
/**
24+
* Ensures that the event flow between the upstream and downstream follow
25+
* the Reactive-Streams 1.0 specification by honoring the 3 additional rules
26+
* (which are omitted in standard operators due to performance reasons).
27+
* <ul>
28+
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns</li>
29+
* <li>§2.3: onError or onComplete must not call cancel</li>
30+
* <li>§3.9: negative requests should emit an onError(IllegalArgumentException)</li>
31+
* </ul>
32+
* In addition, if rule §2.12 (onSubscribe must be called at most once) is violated,
33+
* the sequence is cancelled an onError(IllegalStateException) is emitted.
34+
* @param <T> the value type
35+
*/
36+
public final class FlowableStrict<T> extends AbstractFlowableWithUpstream<T, T> {
37+
38+
public FlowableStrict(Publisher<T> source) {
39+
super(source);
40+
}
41+
42+
@Override
43+
protected void subscribeActual(Subscriber<? super T> s) {
44+
source.subscribe(new StrictSubscriber<T>(s));
45+
}
46+
47+
static final class StrictSubscriber<T>
48+
extends AtomicInteger
49+
implements Subscriber<T>, Subscription {
50+
51+
private static final long serialVersionUID = -4945028590049415624L;
52+
53+
final Subscriber<? super T> actual;
54+
55+
final AtomicThrowable error;
56+
57+
final AtomicLong requested;
58+
59+
final AtomicReference<Subscription> s;
60+
61+
final AtomicBoolean once;
62+
63+
volatile boolean done;
64+
65+
StrictSubscriber(Subscriber<? super T> actual) {
66+
this.actual = actual;
67+
this.error = new AtomicThrowable();
68+
this.requested = new AtomicLong();
69+
this.s = new AtomicReference<Subscription>();
70+
this.once = new AtomicBoolean();
71+
}
72+
73+
@Override
74+
public void request(long n) {
75+
if (n <= 0) {
76+
cancel();
77+
onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
78+
} else {
79+
SubscriptionHelper.deferredRequest(s, requested, n);
80+
}
81+
}
82+
83+
@Override
84+
public void cancel() {
85+
if (!done) {
86+
SubscriptionHelper.cancel(s);
87+
}
88+
}
89+
90+
@Override
91+
public void onSubscribe(Subscription s) {
92+
if (once.compareAndSet(false, true)) {
93+
94+
actual.onSubscribe(this);
95+
96+
SubscriptionHelper.deferredSetOnce(this.s, requested, s);
97+
} else {
98+
s.cancel();
99+
cancel();
100+
onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
101+
}
102+
}
103+
104+
@Override
105+
public void onNext(T t) {
106+
HalfSerializer.onNext(actual, t, this, error);
107+
}
108+
109+
@Override
110+
public void onError(Throwable t) {
111+
done = true;
112+
HalfSerializer.onError(actual, t, this, error);
113+
}
114+
115+
@Override
116+
public void onComplete() {
117+
done = true;
118+
HalfSerializer.onComplete(actual, this, error);
119+
}
120+
}
121+
122+
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import static org.junit.Assert.*;
17+
18+
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.junit.Test;
22+
import org.reactivestreams.*;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.exceptions.TestException;
26+
import io.reactivex.internal.subscriptions.BooleanSubscription;
27+
import io.reactivex.schedulers.Schedulers;
28+
import io.reactivex.subscribers.TestSubscriber;
29+
30+
public class FlowableStrictTest {
31+
32+
@Test
33+
public void empty() {
34+
Flowable.empty()
35+
.strict()
36+
.test()
37+
.assertResult();
38+
}
39+
40+
@Test
41+
public void just() {
42+
Flowable.just(1)
43+
.strict()
44+
.test()
45+
.assertResult(1);
46+
}
47+
48+
@Test
49+
public void range() {
50+
Flowable.range(1, 5)
51+
.strict()
52+
.test()
53+
.assertResult(1, 2, 3, 4, 5);
54+
}
55+
56+
@Test
57+
public void take() {
58+
Flowable.range(1, 5)
59+
.take(2)
60+
.strict()
61+
.test()
62+
.assertResult(1, 2);
63+
}
64+
65+
@Test
66+
public void backpressure() {
67+
Flowable.range(1, 5)
68+
.strict()
69+
.test(0)
70+
.assertEmpty()
71+
.requestMore(1)
72+
.assertValue(1)
73+
.requestMore(2)
74+
.assertValues(1, 2, 3)
75+
.requestMore(2)
76+
.assertResult(1, 2, 3, 4, 5);
77+
}
78+
79+
@Test
80+
public void error() {
81+
Flowable.error(new TestException())
82+
.strict()
83+
.test()
84+
.assertFailure(TestException.class);
85+
}
86+
87+
@Test
88+
public void observeOn() {
89+
Flowable.range(1, 5)
90+
.hide()
91+
.observeOn(Schedulers.single())
92+
.strict()
93+
.test()
94+
.awaitDone(5, TimeUnit.SECONDS)
95+
.assertResult(1, 2, 3, 4, 5);
96+
}
97+
98+
@Test
99+
public void invalidRequest() {
100+
for (int i = 0; i > -100; i--) {
101+
final int j = i;
102+
final List<Object> items = new ArrayList<Object>();
103+
104+
Flowable.range(1, 2)
105+
.strict()
106+
.subscribe(new Subscriber<Integer>() {
107+
@Override
108+
public void onSubscribe(Subscription s) {
109+
s.request(j);
110+
}
111+
112+
@Override
113+
public void onNext(Integer t) {
114+
items.add(t);
115+
}
116+
117+
@Override
118+
public void onError(Throwable t) {
119+
items.add(t);
120+
}
121+
122+
@Override
123+
public void onComplete() {
124+
items.add("Done");
125+
}
126+
});
127+
128+
assertTrue(items.toString(), items.size() == 1);
129+
assertTrue(items.toString(), items.get(0) instanceof IllegalArgumentException);
130+
assertTrue(items.toString(), items.get(0).toString().contains("§3.9"));
131+
}
132+
}
133+
134+
@Test
135+
public void doubleOnSubscribe() {
136+
final BooleanSubscription bs1 = new BooleanSubscription();
137+
final BooleanSubscription bs2 = new BooleanSubscription();
138+
139+
TestSubscriber<Object> ts = Flowable.fromPublisher(new Publisher<Object>() {
140+
@Override
141+
public void subscribe(Subscriber<? super Object> p) {
142+
p.onSubscribe(bs1);
143+
p.onSubscribe(bs2);
144+
}
145+
})
146+
.strict()
147+
.test()
148+
.assertFailure(IllegalStateException.class);
149+
150+
assertTrue(bs1.isCancelled());
151+
assertTrue(bs2.isCancelled());
152+
153+
String es = ts.errors().get(0).toString();
154+
assertTrue(es, es.contains("§2.12"));
155+
}
156+
157+
@Test
158+
public void noCancelOnComplete() {
159+
final BooleanSubscription bs = new BooleanSubscription();
160+
161+
Flowable.fromPublisher(new Publisher<Object>() {
162+
@Override
163+
public void subscribe(Subscriber<? super Object> p) {
164+
p.onSubscribe(bs);
165+
p.onComplete();
166+
}
167+
})
168+
.strict()
169+
.subscribe(new Subscriber<Object>() {
170+
171+
Subscription s;
172+
173+
@Override
174+
public void onSubscribe(Subscription s) {
175+
this.s = s;
176+
}
177+
178+
@Override
179+
public void onNext(Object t) {
180+
// not called
181+
}
182+
183+
@Override
184+
public void onError(Throwable t) {
185+
// not called
186+
}
187+
188+
@Override
189+
public void onComplete() {
190+
s.cancel();
191+
}
192+
});
193+
194+
assertFalse(bs.isCancelled());
195+
}
196+
197+
@Test
198+
public void noCancelOnError() {
199+
final BooleanSubscription bs = new BooleanSubscription();
200+
201+
Flowable.fromPublisher(new Publisher<Object>() {
202+
@Override
203+
public void subscribe(Subscriber<? super Object> p) {
204+
p.onSubscribe(bs);
205+
p.onError(new TestException());
206+
}
207+
})
208+
.strict()
209+
.subscribe(new Subscriber<Object>() {
210+
211+
Subscription s;
212+
213+
@Override
214+
public void onSubscribe(Subscription s) {
215+
this.s = s;
216+
}
217+
218+
@Override
219+
public void onNext(Object t) {
220+
// not called
221+
}
222+
223+
@Override
224+
public void onError(Throwable t) {
225+
s.cancel();
226+
}
227+
228+
@Override
229+
public void onComplete() {
230+
// not called
231+
}
232+
});
233+
234+
assertFalse(bs.isCancelled());
235+
}
236+
}

0 commit comments

Comments
 (0)