Skip to content

Commit 161dbef

Browse files
authored
1.x: fix timed replay() replaying old data for late subscribers (#4023)
1 parent 519c38b commit 161dbef

File tree

2 files changed

+69
-5
lines changed

2 files changed

+69
-5
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,16 @@ final void setFirst(Node n) {
10051005
set(n);
10061006
}
10071007

1008+
/**
1009+
* Returns the current head for initializing the replay location
1010+
* for a new subscriber.
1011+
* Override it to consider linked but outdated elements.
1012+
* @return the current head
1013+
*/
1014+
Node getInitialHead() {
1015+
return get();
1016+
}
1017+
10081018
@Override
10091019
public final void next(T value) {
10101020
Object o = enterTransform(nl.next(value));
@@ -1049,7 +1059,7 @@ public final void replay(InnerProducer<T> output) {
10491059

10501060
Node node = output.index();
10511061
if (node == null) {
1052-
node = get();
1062+
node = getInitialHead();
10531063
output.index = node;
10541064

10551065
/*
@@ -1143,7 +1153,7 @@ void truncateFinal() {
11431153

11441154
}
11451155
/* test */ final void collect(Collection<? super T> output) {
1146-
Node n = get();
1156+
Node n = getInitialHead();
11471157
for (;;) {
11481158
Node next = n.get();
11491159
if (next != null) {
@@ -1219,6 +1229,20 @@ Object leaveTransform(Object value) {
12191229
return ((Timestamped<?>)value).getValue();
12201230
}
12211231

1232+
@Override
1233+
Node getInitialHead() {
1234+
long timeLimit = scheduler.now() - maxAgeInMillis;
1235+
Node prev = get();
1236+
1237+
Node next = prev.get();
1238+
while (next != null && ((Timestamped<?>)next.value).getTimestampMillis() <= timeLimit) {
1239+
prev = next;
1240+
next = next.get();
1241+
}
1242+
1243+
return prev;
1244+
}
1245+
12221246
@Override
12231247
void truncate() {
12241248
long timeLimit = scheduler.now() - maxAgeInMillis;

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ public void testWindowedReplay() {
193193
InOrder inOrder = inOrder(observer1);
194194

195195
co.subscribe(observer1);
196-
inOrder.verify(observer1, times(1)).onNext(3);
196+
// since onComplete is also delayed, value 3 becomes too old for replay.
197+
inOrder.verify(observer1, never()).onNext(3);
197198

198199
inOrder.verify(observer1, times(1)).onCompleted();
199200
inOrder.verifyNoMoreInteractions();
@@ -479,7 +480,8 @@ public void testWindowedReplayError() {
479480
InOrder inOrder = inOrder(observer1);
480481

481482
co.subscribe(observer1);
482-
inOrder.verify(observer1, times(1)).onNext(3);
483+
// since onError is also delayed, value 3 becomes too old for replay.
484+
inOrder.verify(observer1, never()).onNext(3);
483485

484486
inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
485487
inOrder.verifyNoMoreInteractions();
@@ -788,10 +790,16 @@ public void testTimedAndSizedTruncation() {
788790
buf.next(1);
789791
test.advanceTimeBy(1, TimeUnit.SECONDS);
790792
buf.next(2);
791-
test.advanceTimeBy(1, TimeUnit.SECONDS);
793+
// exact 1 second makes value 1 too old
794+
test.advanceTimeBy(900, TimeUnit.MILLISECONDS);
792795
buf.collect(values);
793796
Assert.assertEquals(Arrays.asList(1, 2), values);
794797

798+
values.clear();
799+
test.advanceTimeBy(100, TimeUnit.MILLISECONDS);
800+
buf.collect(values);
801+
Assert.assertEquals(Arrays.asList(2), values);
802+
795803
buf.next(3);
796804
buf.next(4);
797805
values.clear();
@@ -1257,4 +1265,36 @@ public void testSubscribersComeAndGoAtRequestBoundaries2() {
12571265
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
12581266
ts3.assertCompleted();
12591267
}
1268+
1269+
@Test
1270+
public void dontReplayOldValues() {
1271+
1272+
PublishSubject<Integer> ps = PublishSubject.create();
1273+
1274+
TestScheduler scheduler = new TestScheduler();
1275+
1276+
ConnectableObservable<Integer> co = ps.replay(1, TimeUnit.SECONDS, scheduler);
1277+
1278+
co.subscribe(); // make sure replay runs in unbounded mode
1279+
1280+
co.connect();
1281+
1282+
ps.onNext(1);
1283+
1284+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
1285+
1286+
ps.onNext(2);
1287+
1288+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
1289+
1290+
ps.onNext(3);
1291+
1292+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
1293+
1294+
TestSubscriber<Integer> ts = TestSubscriber.create();
1295+
1296+
co.subscribe(ts);
1297+
1298+
ts.assertValue(3);
1299+
}
12601300
}

0 commit comments

Comments
 (0)