Skip to content

Commit e588446

Browse files
committed
Merge pull request #3167 from akarnokd/ReplayManageRequestsFix
Fixed negative request due to unsubscription of a large requester
2 parents 7a77072 + 9d7bd8b commit e588446

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ void manageRequests() {
501501
InnerProducer<T>[] a = producers.get();
502502

503503
long ri = maxChildRequested;
504-
long maxTotalRequests = 0;
504+
long maxTotalRequests = ri;
505505

506506
for (InnerProducer<T> rp : a) {
507507
maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get());

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,4 +1120,29 @@ public void onNext(Integer t) {
11201120
ts.assertNotCompleted();
11211121
ts.assertError(TestException.class);
11221122
}
1123+
1124+
@Test
1125+
public void unboundedLeavesEarly() {
1126+
PublishSubject<Integer> source = PublishSubject.create();
1127+
1128+
final List<Long> requests = new ArrayList<Long>();
1129+
1130+
Observable<Integer> out = source
1131+
.doOnRequest(new Action1<Long>() {
1132+
@Override
1133+
public void call(Long t) {
1134+
requests.add(t);
1135+
}
1136+
}).replay().autoConnect();
1137+
1138+
TestSubscriber<Integer> ts1 = TestSubscriber.create(5);
1139+
TestSubscriber<Integer> ts2 = TestSubscriber.create(10);
1140+
1141+
out.subscribe(ts1);
1142+
out.subscribe(ts2);
1143+
ts2.unsubscribe();
1144+
1145+
Assert.assertEquals(Arrays.asList(5L, 5L), requests);
1146+
}
1147+
11231148
}

0 commit comments

Comments
 (0)