Skip to content

Commit 95342ad

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Fixed upstream delivery counting and timer cancelling concurrency bug.
1 parent 6399270 commit 95342ad

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

src/main/java/rx/internal/operators/OperatorBufferWithTimeAndSize.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ static final class BufferSubscriber<T> extends Subscriber<T> {
6969
/** Tracks the downstream requested amounts. */
7070
final AtomicLong requested;
7171

72+
/** Tracks the upstream requested amounts. */
73+
final AtomicLong upstreamRequested;
74+
7275
/** Holds onto the current timer. */
7376
final SerialSubscription timer;
7477

@@ -92,6 +95,7 @@ public BufferSubscriber(Subscriber<? super List<T>> child, int size,
9295
this.worker = worker;
9396
this.timeRemaining = unit.toMillis(time);
9497
this.requested = new AtomicLong();
98+
this.upstreamRequested = new AtomicLong();
9599
this.timer = new SerialSubscription();
96100
this.add(timer);
97101
}
@@ -103,10 +107,12 @@ public void setProducer(Producer producer) {
103107

104108
@Override
105109
public void onNext(T t) {
106-
if (requested.get() == 0) {
110+
if (upstreamRequested.get() == 0) {
107111
onError(new MissingBackpressureException());
108112
return;
109113
}
114+
upstreamRequested.decrementAndGet();
115+
110116
List<T> list;
111117
long r;
112118
long id;
@@ -187,9 +193,6 @@ void timeout() {
187193
void scheduleTimer(long r, long id, long delay) {
188194
if (r > 0 && r < Long.MAX_VALUE) {
189195
timer.set(worker.schedule(new TimerAction(id), delay, unit));
190-
} else
191-
if (r == 0) {
192-
timer.set(Subscriptions.unsubscribed());
193196
}
194197
}
195198

@@ -297,6 +300,17 @@ void handleRequested(long before, long request) {
297300
timer.set(worker.schedulePeriodically(new PeriodicAction(), time, time, unit));
298301
}
299302
}
303+
for (;;) {
304+
long r2 = upstreamRequested.get();
305+
long u2 = r2 + elements;
306+
if (u2 < 0) {
307+
u2 = Long.MAX_VALUE;
308+
}
309+
if (upstreamRequested.compareAndSet(r2, u2)) {
310+
break;
311+
}
312+
}
313+
300314
Producer p = producer;
301315
if (p != null) {
302316
p.request(elements);

src/test/java/rx/internal/operators/OperatorBufferWithTimeAndSizeTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,40 @@ public void testSingleRequestWithTimeout() {
103103

104104
ts.assertValue(Arrays.asList(1));
105105
ts.assertNoTerminalEvent();
106+
107+
ps.onNext(2);
108+
ps.onNext(3);
109+
110+
ts.assertValue(Arrays.asList(1));
111+
ts.assertNoTerminalEvent();
112+
}
113+
@Test
114+
public void testUpstreamOverflowBuffer() {
115+
TestScheduler scheduler = new TestScheduler();
116+
PublishSubject<Integer> ps = PublishSubject.create();
117+
Observable<List<Integer>> source = ps.buffer(100, TimeUnit.MILLISECONDS, 10, scheduler);
118+
119+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>(0);
120+
121+
source.subscribe(ts);
122+
123+
ts.requestMore(1);
124+
125+
ps.onNext(1);
126+
127+
ts.assertNoValues();
128+
ts.assertNoTerminalEvent();
129+
130+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
131+
132+
ts.assertValue(Arrays.asList(1));
133+
ts.assertNoTerminalEvent();
134+
135+
for (int i = 2; i <= 11; i++) {
136+
ps.onNext(i);
137+
}
138+
139+
ts.assertValue(Arrays.asList(1));
140+
ts.assertError(MissingBackpressureException.class);
106141
}
107142
}

0 commit comments

Comments
 (0)