Skip to content

Commit 19ba993

Browse files
authored
2.x: fix timed replay-like components replaying outdated items (#5140)
1 parent 3356444 commit 19ba993

File tree

8 files changed

+188
-50
lines changed

8 files changed

+188
-50
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ public final void replay(InnerSubscription<T> output) {
947947

948948
Node node = output.index();
949949
if (node == null) {
950-
node = get();
950+
node = getHead();
951951
output.index = node;
952952

953953
BackpressureHelper.add(output.totalRequested, node.index);
@@ -1033,7 +1033,7 @@ void truncateFinal() {
10331033

10341034
}
10351035
/* test */ final void collect(Collection<? super T> output) {
1036-
Node n = get();
1036+
Node n = getHead();
10371037
for (;;) {
10381038
Node next = n.get();
10391039
if (next != null) {
@@ -1055,6 +1055,10 @@ void truncateFinal() {
10551055
/* test */ boolean hasCompleted() {
10561056
return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value));
10571057
}
1058+
1059+
Node getHead() {
1060+
return get();
1061+
}
10581062
}
10591063

10601064
/**
@@ -1172,5 +1176,28 @@ void truncateFinal() {
11721176
setFirst(prev);
11731177
}
11741178
}
1179+
1180+
@Override
1181+
Node getHead() {
1182+
long timeLimit = scheduler.now(unit) - maxAge;
1183+
Node prev = get();
1184+
Node next = prev.get();
1185+
for (;;) {
1186+
if (next == null) {
1187+
break;
1188+
}
1189+
Timed<?> v = (Timed<?>)next.value;
1190+
if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) {
1191+
break;
1192+
}
1193+
if (v.time() <= timeLimit) {
1194+
prev = next;
1195+
next = next.get();
1196+
} else {
1197+
break;
1198+
}
1199+
}
1200+
return prev;
1201+
}
11751202
}
11761203
}

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ public final void replay(InnerDisposable<T> output) {
757757
for (;;) {
758758
Node node = output.index();
759759
if (node == null) {
760-
node = get();
760+
node = getHead();
761761
output.index = node;
762762
}
763763

@@ -821,7 +821,7 @@ void truncateFinal() {
821821

822822
}
823823
/* test */ final void collect(Collection<? super T> output) {
824-
Node n = get();
824+
Node n = getHead();
825825
for (;;) {
826826
Node next = n.get();
827827
if (next != null) {
@@ -843,6 +843,10 @@ void truncateFinal() {
843843
/* test */ boolean hasCompleted() {
844844
return tail.value != null && NotificationLite.isComplete(leaveTransform(tail.value));
845845
}
846+
847+
Node getHead() {
848+
return get();
849+
}
846850
}
847851

848852
/**
@@ -960,5 +964,28 @@ void truncateFinal() {
960964
setFirst(prev);
961965
}
962966
}
967+
968+
@Override
969+
Node getHead() {
970+
long timeLimit = scheduler.now(unit) - maxAge;
971+
Node prev = get();
972+
Node next = prev.get();
973+
for (;;) {
974+
if (next == null) {
975+
break;
976+
}
977+
Timed<?> v = (Timed<?>)next.value;
978+
if (NotificationLite.isComplete(v.value()) || NotificationLite.isError(v.value())) {
979+
break;
980+
}
981+
if (v.time() <= timeLimit) {
982+
prev = next;
983+
next = next.get();
984+
} else {
985+
break;
986+
}
987+
}
988+
return prev;
989+
}
963990
}
964991
}

src/main/java/io/reactivex/processors/ReplayProcessor.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,8 +1066,8 @@ public T getValue() {
10661066
@Override
10671067
@SuppressWarnings("unchecked")
10681068
public T[] getValues(T[] array) {
1069-
TimedNode<Object> h = head;
1070-
int s = size();
1069+
TimedNode<Object> h = getHead();
1070+
int s = size(h);
10711071

10721072
if (s == 0) {
10731073
if (array.length != 0) {
@@ -1093,6 +1093,22 @@ public T[] getValues(T[] array) {
10931093
return array;
10941094
}
10951095

1096+
TimedNode<Object> getHead() {
1097+
TimedNode<Object> index = head;
1098+
// skip old entries
1099+
long limit = scheduler.now(unit) - maxAge;
1100+
TimedNode<Object> next = index.get();
1101+
while (next != null) {
1102+
long ts = next.time;
1103+
if (ts > limit) {
1104+
break;
1105+
}
1106+
index = next;
1107+
next = index.get();
1108+
}
1109+
return index;
1110+
}
1111+
10961112
@Override
10971113
@SuppressWarnings("unchecked")
10981114
public void replay(ReplaySubscription<T> rs) {
@@ -1105,20 +1121,7 @@ public void replay(ReplaySubscription<T> rs) {
11051121

11061122
TimedNode<Object> index = (TimedNode<Object>)rs.index;
11071123
if (index == null) {
1108-
index = head;
1109-
if (!done) {
1110-
// skip old entries
1111-
long limit = scheduler.now(unit) - maxAge;
1112-
TimedNode<Object> next = index.get();
1113-
while (next != null) {
1114-
long ts = next.time;
1115-
if (ts > limit) {
1116-
break;
1117-
}
1118-
index = next;
1119-
next = index.get();
1120-
}
1121-
}
1124+
index = getHead();
11221125
}
11231126

11241127
for (;;) {
@@ -1185,8 +1188,11 @@ public void replay(ReplaySubscription<T> rs) {
11851188

11861189
@Override
11871190
public int size() {
1191+
return size(getHead());
1192+
}
1193+
1194+
int size(TimedNode<Object> h) {
11881195
int s = 0;
1189-
TimedNode<Object> h = head;
11901196
while (s != Integer.MAX_VALUE) {
11911197
TimedNode<Object> next = h.get();
11921198
if (next == null) {

src/main/java/io/reactivex/subjects/ReplaySubject.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,11 +1029,27 @@ public T getValue() {
10291029
return (T)v;
10301030
}
10311031

1032+
TimedNode<Object> getHead() {
1033+
TimedNode<Object> index = head;
1034+
// skip old entries
1035+
long limit = scheduler.now(unit) - maxAge;
1036+
TimedNode<Object> next = index.get();
1037+
while (next != null) {
1038+
long ts = next.time;
1039+
if (ts > limit) {
1040+
break;
1041+
}
1042+
index = next;
1043+
next = index.get();
1044+
}
1045+
return index;
1046+
}
1047+
10321048
@Override
10331049
@SuppressWarnings("unchecked")
10341050
public T[] getValues(T[] array) {
1035-
TimedNode<Object> h = head;
1036-
int s = size();
1051+
TimedNode<Object> h = getHead();
1052+
int s = size(h);
10371053

10381054
if (s == 0) {
10391055
if (array.length != 0) {
@@ -1071,20 +1087,7 @@ public void replay(ReplayDisposable<T> rs) {
10711087

10721088
TimedNode<Object> index = (TimedNode<Object>)rs.index;
10731089
if (index == null) {
1074-
index = head;
1075-
if (!done) {
1076-
// skip old entries
1077-
long limit = scheduler.now(unit) - maxAge;
1078-
TimedNode<Object> next = index.get();
1079-
while (next != null) {
1080-
long ts = next.time;
1081-
if (ts > limit) {
1082-
break;
1083-
}
1084-
index = next;
1085-
next = index.get();
1086-
}
1087-
}
1090+
index = getHead();
10881091
}
10891092

10901093
for (;;) {
@@ -1142,8 +1145,11 @@ public void replay(ReplayDisposable<T> rs) {
11421145

11431146
@Override
11441147
public int size() {
1148+
return size(getHead());
1149+
}
1150+
1151+
int size(TimedNode<Object> h) {
11451152
int s = 0;
1146-
TimedNode<Object> h = head;
11471153
while (s != Integer.MAX_VALUE) {
11481154
TimedNode<Object> next = h.get();
11491155
if (next == null) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public void testWindowedReplay() {
175175
InOrder inOrder = inOrder(observer1);
176176

177177
co.subscribe(observer1);
178-
inOrder.verify(observer1, times(1)).onNext(3);
178+
inOrder.verify(observer1, never()).onNext(3);
179179

180180
inOrder.verify(observer1, times(1)).onComplete();
181181
inOrder.verifyNoMoreInteractions();
@@ -451,7 +451,7 @@ public void testWindowedReplayError() {
451451
InOrder inOrder = inOrder(observer1);
452452

453453
co.subscribe(observer1);
454-
inOrder.verify(observer1, times(1)).onNext(3);
454+
inOrder.verify(observer1, never()).onNext(3);
455455

456456
inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
457457
inOrder.verifyNoMoreInteractions();
@@ -775,7 +775,7 @@ public void testTimedAndSizedTruncation() {
775775
buf.next(2);
776776
test.advanceTimeBy(1, TimeUnit.SECONDS);
777777
buf.collect(values);
778-
Assert.assertEquals(Arrays.asList(1, 2), values);
778+
Assert.assertEquals(Arrays.asList(2), values);
779779

780780
buf.next(3);
781781
buf.next(4);
@@ -1648,7 +1648,7 @@ public void testTimedAndSizedTruncationError() {
16481648
buf.next(2);
16491649
test.advanceTimeBy(1, TimeUnit.SECONDS);
16501650
buf.collect(values);
1651-
Assert.assertEquals(Arrays.asList(1, 2), values);
1651+
Assert.assertEquals(Arrays.asList(2), values);
16521652

16531653
buf.next(3);
16541654
buf.next(4);
@@ -1731,4 +1731,21 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
17311731

17321732
assertTrue(bs.isCancelled());
17331733
}
1734+
1735+
@Test
1736+
public void timedNoOutdatedData() {
1737+
TestScheduler scheduler = new TestScheduler();
1738+
1739+
Flowable<Integer> source = Flowable.just(1)
1740+
.replay(2, TimeUnit.SECONDS, scheduler)
1741+
.autoConnect();
1742+
1743+
source.test().assertResult(1);
1744+
1745+
source.test().assertResult(1);
1746+
1747+
scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
1748+
1749+
source.test().assertResult();
1750+
}
17341751
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public void testWindowedReplay() {
175175
InOrder inOrder = inOrder(observer1);
176176

177177
co.subscribe(observer1);
178-
inOrder.verify(observer1, times(1)).onNext(3);
178+
inOrder.verify(observer1, never()).onNext(3);
179179

180180
inOrder.verify(observer1, times(1)).onComplete();
181181
inOrder.verifyNoMoreInteractions();
@@ -451,7 +451,7 @@ public void testWindowedReplayError() {
451451
InOrder inOrder = inOrder(observer1);
452452

453453
co.subscribe(observer1);
454-
inOrder.verify(observer1, times(1)).onNext(3);
454+
inOrder.verify(observer1, never()).onNext(3);
455455

456456
inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class));
457457
inOrder.verifyNoMoreInteractions();
@@ -762,7 +762,7 @@ public void testTimedAndSizedTruncation() {
762762
buf.next(2);
763763
test.advanceTimeBy(1, TimeUnit.SECONDS);
764764
buf.collect(values);
765-
Assert.assertEquals(Arrays.asList(1, 2), values);
765+
Assert.assertEquals(Arrays.asList(2), values);
766766

767767
buf.next(3);
768768
buf.next(4);
@@ -805,7 +805,7 @@ public void testTimedAndSizedTruncationError() {
805805
buf.next(2);
806806
test.advanceTimeBy(1, TimeUnit.SECONDS);
807807
buf.collect(values);
808-
Assert.assertEquals(Arrays.asList(1, 2), values);
808+
Assert.assertEquals(Arrays.asList(2), values);
809809

810810
buf.next(3);
811811
buf.next(4);
@@ -1511,4 +1511,21 @@ protected void subscribeActual(Observer<? super Integer> s) {
15111511

15121512
assertTrue(bs.isDisposed());
15131513
}
1514+
1515+
@Test
1516+
public void timedNoOutdatedData() {
1517+
TestScheduler scheduler = new TestScheduler();
1518+
1519+
Observable<Integer> source = Observable.just(1)
1520+
.replay(2, TimeUnit.SECONDS, scheduler)
1521+
.autoConnect();
1522+
1523+
source.test().assertResult(1);
1524+
1525+
source.test().assertResult(1);
1526+
1527+
scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
1528+
1529+
source.test().assertResult();
1530+
}
15141531
}

0 commit comments

Comments
 (0)