Skip to content

Commit c868ba9

Browse files
committed
Merge pull request #3028 from akarnokd/DelayErrorCutaheadFix
Delay: error cut ahead was not properly serialized
2 parents 7fdbcbf + 6f259eb commit c868ba9

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

src/main/java/rx/internal/operators/OperatorDelay.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,36 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4949
final Worker worker = scheduler.createWorker();
5050
child.add(worker);
5151
return new Subscriber<T>(child) {
52-
52+
// indicates an error cut ahead
53+
// accessed from the worker thread only
54+
boolean done;
5355
@Override
5456
public void onCompleted() {
5557
worker.schedule(new Action0() {
5658

5759
@Override
5860
public void call() {
59-
child.onCompleted();
61+
if (!done) {
62+
done = true;
63+
child.onCompleted();
64+
}
6065
}
6166

6267
}, delay, unit);
6368
}
6469

6570
@Override
66-
public void onError(Throwable e) {
67-
child.onError(e);
71+
public void onError(final Throwable e) {
72+
worker.schedule(new Action0() {
73+
@Override
74+
public void call() {
75+
if (!done) {
76+
done = true;
77+
child.onError(e);
78+
worker.unsubscribe();
79+
}
80+
}
81+
});
6882
}
6983

7084
@Override
@@ -73,7 +87,9 @@ public void onNext(final T t) {
7387

7488
@Override
7589
public void call() {
76-
child.onNext(t);
90+
if (!done) {
91+
child.onNext(t);
92+
}
7793
}
7894

7995
}, delay, unit);

src/test/java/rx/internal/operators/OperatorDelayTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,4 +798,27 @@ public Integer call(Integer t) {
798798
ts.assertNoErrors();
799799
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
800800
}
801+
802+
@Test
803+
public void testErrorRunsBeforeOnNext() {
804+
TestScheduler test = Schedulers.test();
805+
806+
PublishSubject<Integer> ps = PublishSubject.create();
807+
808+
TestSubscriber<Integer> ts = TestSubscriber.create();
809+
810+
ps.delay(1, TimeUnit.SECONDS, test).subscribe(ts);
811+
812+
ps.onNext(1);
813+
814+
test.advanceTimeBy(500, TimeUnit.MILLISECONDS);
815+
816+
ps.onError(new TestException());
817+
818+
test.advanceTimeBy(1, TimeUnit.SECONDS);
819+
820+
ts.assertNoValues();
821+
ts.assertError(TestException.class);
822+
ts.assertNotCompleted();
823+
}
801824
}

0 commit comments

Comments
 (0)