Skip to content

1.x: fix timed replay() replaying old data for late subscribers #4023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,16 @@ final void setFirst(Node n) {
set(n);
}

/**
* Returns the current head for initializing the replay location
* for a new subscriber.
* Override it to consider linked but outdated elements.
* @return the current head
*/
Node getInitialHead() {
return get();
}

@Override
public final void next(T value) {
Object o = enterTransform(nl.next(value));
Expand Down Expand Up @@ -1049,7 +1059,7 @@ public final void replay(InnerProducer<T> output) {

Node node = output.index();
if (node == null) {
node = get();
node = getInitialHead();
output.index = node;

/*
Expand Down Expand Up @@ -1143,7 +1153,7 @@ void truncateFinal() {

}
/* test */ final void collect(Collection<? super T> output) {
Node n = get();
Node n = getInitialHead();
for (;;) {
Node next = n.get();
if (next != null) {
Expand Down Expand Up @@ -1219,6 +1229,20 @@ Object leaveTransform(Object value) {
return ((Timestamped<?>)value).getValue();
}

@Override
Node getInitialHead() {
long timeLimit = scheduler.now() - maxAgeInMillis;
Node prev = get();

Node next = prev.get();
while (next != null && ((Timestamped<?>)next.value).getTimestampMillis() <= timeLimit) {
prev = next;
next = next.get();
}

return prev;
}

@Override
void truncate() {
long timeLimit = scheduler.now() - maxAgeInMillis;
Expand Down
46 changes: 43 additions & 3 deletions src/test/java/rx/internal/operators/OperatorReplayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testWindowedReplay() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
// since onComplete is also delayed, value 3 becomes too old for replay.
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -479,7 +480,8 @@ public void testWindowedReplayError() {
InOrder inOrder = inOrder(observer1);

co.subscribe(observer1);
inOrder.verify(observer1, times(1)).onNext(3);
// since onError is also delayed, value 3 becomes too old for replay.
inOrder.verify(observer1, never()).onNext(3);

inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -788,10 +790,16 @@ public void testTimedAndSizedTruncation() {
buf.next(1);
test.advanceTimeBy(1, TimeUnit.SECONDS);
buf.next(2);
test.advanceTimeBy(1, TimeUnit.SECONDS);
// exact 1 second makes value 1 too old
test.advanceTimeBy(900, TimeUnit.MILLISECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(1, 2), values);

values.clear();
test.advanceTimeBy(100, TimeUnit.MILLISECONDS);
buf.collect(values);
Assert.assertEquals(Arrays.asList(2), values);

buf.next(3);
buf.next(4);
values.clear();
Expand Down Expand Up @@ -1257,4 +1265,36 @@ public void testSubscribersComeAndGoAtRequestBoundaries2() {
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertCompleted();
}

@Test
public void dontReplayOldValues() {

PublishSubject<Integer> ps = PublishSubject.create();

TestScheduler scheduler = new TestScheduler();

ConnectableObservable<Integer> co = ps.replay(1, TimeUnit.SECONDS, scheduler);

co.subscribe(); // make sure replay runs in unbounded mode

co.connect();

ps.onNext(1);

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

ps.onNext(2);

scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

ps.onNext(3);

scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

TestSubscriber<Integer> ts = TestSubscriber.create();

co.subscribe(ts);

ts.assertValue(3);
}
}