Skip to content

Commit 4210577

Browse files
authored
1.x: increase coverage of producers (#4117)
1 parent a6d6ba9 commit 4210577

File tree

5 files changed

+416
-0
lines changed

5 files changed

+416
-0
lines changed

src/main/java/rx/internal/producers/ProducerObserverArbiter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void onNext(T t) {
6969
q.add(t);
7070
return;
7171
}
72+
emitting = true;
7273
}
7374
boolean skipFinal = false;
7475
try {
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.producers;
17+
18+
import org.junit.*;
19+
20+
import rx.Producer;
21+
import rx.exceptions.TestException;
22+
23+
public class ProducerArbiterTest {
24+
25+
@Test
26+
public void negativeRequestThrows() {
27+
ProducerArbiter pa = new ProducerArbiter();
28+
try {
29+
pa.request(-99);
30+
Assert.fail("Failed to throw on invalid request amount");
31+
} catch (IllegalArgumentException ex) {
32+
Assert.assertEquals("n >= 0 required", ex.getMessage());
33+
}
34+
}
35+
36+
@Test
37+
public void negativeProducedThrows() {
38+
ProducerArbiter pa = new ProducerArbiter();
39+
try {
40+
pa.produced(-99);
41+
Assert.fail("Failed to throw on invalid produced amount");
42+
} catch (IllegalArgumentException ex) {
43+
Assert.assertEquals("n > 0 required", ex.getMessage());
44+
}
45+
}
46+
47+
@Test
48+
public void overproductionThrows() {
49+
ProducerArbiter pa = new ProducerArbiter();
50+
try {
51+
pa.produced(1);
52+
Assert.fail("Failed to throw on overproduction amount");
53+
} catch (IllegalStateException ex) {
54+
Assert.assertEquals("more items arrived than were requested", ex.getMessage());
55+
}
56+
}
57+
58+
@Test
59+
public void nullProducerAccepted() {
60+
ProducerArbiter pa = new ProducerArbiter();
61+
pa.setProducer(null);
62+
}
63+
64+
@Test
65+
public void failedRequestUnlocksEmitting() {
66+
ProducerArbiter pa = new ProducerArbiter();
67+
pa.setProducer(new Producer() {
68+
@Override
69+
public void request(long n) {
70+
if (n != 0) {
71+
throw new TestException("Forced failure");
72+
}
73+
}
74+
});
75+
try {
76+
pa.request(1);
77+
Assert.fail("Failed to throw on overproduction amount");
78+
} catch (TestException ex) {
79+
Assert.assertEquals("Forced failure", ex.getMessage());
80+
Assert.assertFalse("Still emitting?!", pa.emitting);
81+
}
82+
}
83+
84+
@Test
85+
public void reentrantSetProducerNull() {
86+
final ProducerArbiter pa = new ProducerArbiter();
87+
pa.setProducer(new Producer() {
88+
@Override
89+
public void request(long n) {
90+
pa.setProducer(null);
91+
}
92+
});
93+
}
94+
95+
@Test
96+
public void reentrantSetProducer() {
97+
final ProducerArbiter pa = new ProducerArbiter();
98+
pa.setProducer(new Producer() {
99+
@Override
100+
public void request(long n) {
101+
pa.setProducer(new ProducerArbiter());
102+
}
103+
});
104+
}
105+
106+
@Test
107+
public void overproductionReentrantThrows() {
108+
final ProducerArbiter pa = new ProducerArbiter();
109+
try {
110+
pa.setProducer(new Producer() {
111+
@Override
112+
public void request(long n) {
113+
if (n != 0) {
114+
pa.produced(2);
115+
}
116+
}
117+
});
118+
pa.request(1);
119+
Assert.fail("Failed to throw on overproduction amount");
120+
} catch (IllegalStateException ex) {
121+
Assert.assertEquals("more produced than requested", ex.getMessage());
122+
}
123+
}
124+
125+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.producers;
17+
18+
import org.junit.*;
19+
20+
import rx.*;
21+
import rx.exceptions.TestException;
22+
import rx.observers.*;
23+
24+
public class ProducerObserverArbiterTest {
25+
26+
@Test
27+
public void negativeRequestThrows() {
28+
ProducerObserverArbiter<Integer> pa = new ProducerObserverArbiter<Integer>(Subscribers.empty());
29+
try {
30+
pa.request(-99);
31+
Assert.fail("Failed to throw on invalid request amount");
32+
} catch (IllegalArgumentException ex) {
33+
Assert.assertEquals("n >= 0 required", ex.getMessage());
34+
}
35+
}
36+
37+
@Test
38+
public void nullProducerAccepted() {
39+
ProducerObserverArbiter<Integer> pa = new ProducerObserverArbiter<Integer>(Subscribers.empty());
40+
pa.setProducer(null);
41+
}
42+
43+
public void failedRequestUnlocksEmitting() {
44+
ProducerObserverArbiter<Integer> pa = new ProducerObserverArbiter<Integer>(Subscribers.empty());
45+
pa.setProducer(new Producer() {
46+
@Override
47+
public void request(long n) {
48+
throw new TestException("Forced failure");
49+
}
50+
});
51+
try {
52+
pa.request(1);
53+
Assert.fail("Failed to throw on overproduction amount");
54+
} catch (TestException ex) {
55+
Assert.assertEquals("Forced failure", ex.getMessage());
56+
Assert.assertFalse("Still emitting?!", pa.emitting);
57+
}
58+
}
59+
60+
@Test
61+
public void reentrantSetProducerNull() {
62+
final ProducerObserverArbiter<Integer> pa = new ProducerObserverArbiter<Integer>(Subscribers.empty());
63+
pa.setProducer(new Producer() {
64+
@Override
65+
public void request(long n) {
66+
pa.setProducer(null);
67+
}
68+
});
69+
}
70+
71+
@Test
72+
public void reentrantSetProducer() {
73+
final ProducerObserverArbiter<Integer> pa = new ProducerObserverArbiter<Integer>(Subscribers.empty());
74+
pa.setProducer(new Producer() {
75+
@Override
76+
public void request(long n) {
77+
pa.setProducer(new ProducerArbiter());
78+
}
79+
});
80+
}
81+
82+
@Test
83+
public void reentrantOnNext() {
84+
@SuppressWarnings("rawtypes")
85+
final Observer[] o = { null };
86+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
87+
@SuppressWarnings("unchecked")
88+
@Override
89+
public void onNext(Integer t) {
90+
if (t == 1) {
91+
o[0].onNext(2);
92+
}
93+
super.onNext(t);
94+
}
95+
};
96+
ProducerObserverArbiter<Integer> poa = new ProducerObserverArbiter<Integer>(ts);
97+
o[0] = poa;
98+
poa.request(2);
99+
poa.onNext(1);
100+
ts.assertValues(1, 2);
101+
}
102+
103+
@Test
104+
public void reentrantOnError() {
105+
@SuppressWarnings("rawtypes")
106+
final Observer[] o = { null };
107+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
108+
@Override
109+
public void onNext(Integer t) {
110+
if (t == 1) {
111+
o[0].onError(new TestException());
112+
}
113+
super.onNext(t);
114+
}
115+
};
116+
ProducerObserverArbiter<Integer> poa = new ProducerObserverArbiter<Integer>(ts);
117+
o[0] = poa;
118+
poa.onNext(1);
119+
ts.assertValue(1);
120+
ts.assertError(TestException.class);
121+
}
122+
123+
@Test
124+
public void reentrantOnCompleted() {
125+
@SuppressWarnings("rawtypes")
126+
final Observer[] o = { null };
127+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
128+
@Override
129+
public void onNext(Integer t) {
130+
if (t == 1) {
131+
o[0].onCompleted();
132+
}
133+
super.onNext(t);
134+
}
135+
};
136+
ProducerObserverArbiter<Integer> poa = new ProducerObserverArbiter<Integer>(ts);
137+
o[0] = poa;
138+
poa.onNext(1);
139+
ts.assertValue(1);
140+
ts.assertCompleted();
141+
}
142+
143+
@Test
144+
public void onNextThrows() {
145+
@SuppressWarnings("rawtypes")
146+
final Observer[] o = { null };
147+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
148+
@Override
149+
public void onNext(Integer t) {
150+
throw new TestException();
151+
}
152+
};
153+
ProducerObserverArbiter<Integer> poa = new ProducerObserverArbiter<Integer>(ts);
154+
o[0] = poa;
155+
try {
156+
poa.onNext(1);
157+
Assert.fail("Arbiter didn't throw");
158+
} catch (TestException ex) {
159+
// expected
160+
}
161+
}
162+
163+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package rx.internal.producers;
2+
3+
import java.util.concurrent.*;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.junit.*;
7+
8+
import rx.Scheduler;
9+
import rx.functions.Action0;
10+
import rx.observers.*;
11+
import rx.schedulers.Schedulers;
12+
13+
public class SingleDelayedProducerTest {
14+
15+
@Test
16+
public void negativeRequestThrows() {
17+
SingleDelayedProducer<Integer> pa = new SingleDelayedProducer<Integer>(Subscribers.empty());
18+
try {
19+
pa.request(-99);
20+
Assert.fail("Failed to throw on invalid request amount");
21+
} catch (IllegalArgumentException ex) {
22+
Assert.assertEquals("n >= 0 required", ex.getMessage());
23+
}
24+
}
25+
26+
@Test
27+
public void requestCompleteRace() throws Exception {
28+
Scheduler.Worker w = Schedulers.computation().createWorker();
29+
try {
30+
for (int i = 0; i < 10000; i++) {
31+
final AtomicInteger waiter = new AtomicInteger(2);
32+
33+
TestSubscriber<Integer> ts = TestSubscriber.create();
34+
35+
final SingleDelayedProducer<Integer> pa = new SingleDelayedProducer<Integer>(ts);
36+
37+
final CountDownLatch cdl = new CountDownLatch(1);
38+
39+
w.schedule(new Action0() {
40+
@Override
41+
public void call() {
42+
waiter.decrementAndGet();
43+
while (waiter.get() != 0) ;
44+
pa.request(1);
45+
cdl.countDown();
46+
}
47+
});
48+
49+
waiter.decrementAndGet();
50+
while (waiter.get() != 0) ;
51+
pa.setValue(1);
52+
if (!cdl.await(5, TimeUnit.SECONDS)) {
53+
Assert.fail("The wait for completion timed out");
54+
}
55+
56+
ts.assertValue(1);
57+
ts.assertCompleted();
58+
}
59+
} finally {
60+
w.unsubscribe();
61+
}
62+
}
63+
64+
}

0 commit comments

Comments
 (0)