Skip to content

1.x: increase timeout of some tests #5471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/SingleEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* <p>
* All methods are thread-safe; calling onSuccess or onError twice or one after the other has
* no effect.
* <p>History: 1.2.3 - experimental
* <p>History: 1.2.3 - experimental
* @param <T> the success value type
* @since 1.3
*/
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/rx/observables/AsyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ boolean tryEmit(long n) {
onNextCalled = false;
expectedDelivery = n;
nextIteration(n);
//hasTerminated will be true when onCompleted was already emitted from the request callback
//even if the the observer has not seen onCompleted from the requested observable,

//hasTerminated will be true when onCompleted was already emitted from the request callback
//even if the the observer has not seen onCompleted from the requested observable,
//so we should not clean up while there are active subscriptions
if (hasTerminated && !subscriptions.hasSubscriptions() || isUnsubscribed()) {
cleanup();
Expand Down
105 changes: 58 additions & 47 deletions src/test/java/rx/internal/operators/BlockingOperatorNextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,69 +234,80 @@ public void testNextWithCallingHasNextMultipleTimes() {
*/
@Test
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
final CountDownLatch finished = new CountDownLatch(1);
final int COUNT = 30;
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger count = new AtomicInteger(0);
final Observable<Integer> obs = Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() {
int retries = 10;

@Override
public void call(final Subscriber<? super Integer> o) {
new Thread(new Runnable() {
for (;;) {
try {
final CountDownLatch finished = new CountDownLatch(1);
final int COUNT = 30;
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger count = new AtomicInteger(0);
final Observable<Integer> obs = Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() {

@Override
public void run() {
try {
while (running.get()) {
o.onNext(count.incrementAndGet());
timeHasPassed.countDown();
public void call(final Subscriber<? super Integer> o) {
new Thread(new Runnable() {

@Override
public void run() {
try {
while (running.get()) {
o.onNext(count.incrementAndGet());
timeHasPassed.countDown();
}
o.onCompleted();
} catch (Throwable e) {
o.onError(e);
} finally {
finished.countDown();
}
}
o.onCompleted();
} catch (Throwable e) {
o.onError(e);
} finally {
finished.countDown();
}
}).start();
}
}).start();
}

});
});

try {
Iterator<Integer> it = next(obs).iterator();
try {
Iterator<Integer> it = next(obs).iterator();

assertTrue(it.hasNext());
int a = it.next();
assertTrue(it.hasNext());
int b = it.next();
// we should have a different value
assertTrue("a and b should be different", a != b);
assertTrue(it.hasNext());
int a = it.next();
assertTrue(it.hasNext());
int b = it.next();
// we should have a different value
assertTrue("a and b should be different", a != b);

// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);

assertTrue(it.hasNext());
int c = it.next();
assertTrue(it.hasNext());
int c = it.next();

assertTrue("c should not just be the next in sequence", c != (b + 1));
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
assertTrue("c should not just be the next in sequence", c != (b + 1));
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);

assertTrue(it.hasNext());
int d = it.next();
assertTrue(d > c);
assertTrue(it.hasNext());
int d = it.next();
assertTrue(d > c);

// shut down the thread
running.set(false);
// shut down the thread
running.set(false);

finished.await();
finished.await();

assertFalse(it.hasNext());
assertFalse(it.hasNext());

System.out.println("a: " + a + " b: " + b + " c: " + c);
} finally {
running.set(false); // don't let the thread spin indefinitely
System.out.println("a: " + a + " b: " + b + " c: " + c);
} finally {
running.set(false); // don't let the thread spin indefinitely
}
return;
} catch (AssertionError ex) {
if (retries-- == 0) {
throw ex;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CompletableOnErrorXTest {
@Test
public void nextUnsubscribe() {
PublishSubject<Integer> ps = PublishSubject.create();

AssertableSubscriber<Void> as = ps.toCompletable()
.onErrorResumeNext(new Func1<Throwable, Completable>() {
@Override
Expand All @@ -49,7 +49,7 @@ public Completable call(Throwable e) {
@Test
public void completeUnsubscribe() {
PublishSubject<Integer> ps = PublishSubject.create();

AssertableSubscriber<Void> as = ps.toCompletable()
.onErrorComplete()
.test();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public Observable<Integer> call(Integer t) {
}
}
}
@Test(timeout = 30000)
@Test(timeout = 60000)
public void flatMapRangeMixedAsyncLoop() {
for (int i = 0; i < 2000; i++) {
if (i % 10 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testSimpleAsyncLoop() {
testSimpleAsync();
}
}
@Test(timeout = 10000)
@Test(timeout = 30000)
public void testSimpleAsync() {
for (int i = 1; i < 50; i++) {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Expand All @@ -217,7 +217,7 @@ public void testSimpleAsync() {
assertEquals(expected, actual);
}
}
@Test(timeout = 10000)
@Test(timeout = 30000)
public void testSimpleOneLessAsyncLoop() {
int max = 200;
if (PlatformDependent.isAndroid()) {
Expand All @@ -227,7 +227,7 @@ public void testSimpleOneLessAsyncLoop() {
testSimpleOneLessAsync();
}
}
@Test(timeout = 10000)
@Test(timeout = 30000)
public void testSimpleOneLessAsync() {
long t = System.currentTimeMillis();
for (int i = 2; i < 50; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ public void asyncInner() throws Throwable {
.switchMap(UtilityFunctions.<Observable<Integer>>identity())
.observeOn(Schedulers.computation())
.ignoreElements()
.timeout(5, TimeUnit.SECONDS)
.timeout(15, TimeUnit.SECONDS)
.toBlocking()
.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
Expand Down