Skip to content

Commit dd3d7e0

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Backpressure support for buffer(time) and buffer(time, size)
1 parent 8758fdd commit dd3d7e0

File tree

5 files changed

+695
-49
lines changed

5 files changed

+695
-49
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3289,7 +3289,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
32893289
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
32903290
*/
32913291
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
3292-
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, count, scheduler));
3292+
return lift(new OperatorBufferWithTimeAndSize<T>(timespan, unit, count, scheduler));
32933293
}
32943294

32953295
/**
@@ -3320,7 +3320,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
33203320
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
33213321
*/
33223322
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler) {
3323-
return buffer(timespan, timespan, unit, scheduler);
3323+
return buffer(timespan, unit, Integer.MAX_VALUE, scheduler);
33243324
}
33253325

33263326
/**
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import rx.Observable.Operator;
23+
import rx.*;
24+
import rx.Scheduler.Worker;
25+
import rx.exceptions.*;
26+
import rx.functions.Action0;
27+
import rx.subscriptions.*;
28+
29+
/**
30+
* Buffers the source into Lists with maximum size or emission duration, respecting backpressure.
31+
*/
32+
public final class OperatorBufferWithTimeAndSize<T> implements Operator<List<T>, T> {
33+
final int size;
34+
final long time;
35+
final TimeUnit unit;
36+
final Scheduler scheduler;
37+
public OperatorBufferWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler) {
38+
this.size = size;
39+
this.time = time;
40+
this.unit = unit;
41+
this.scheduler = scheduler;
42+
}
43+
@Override
44+
public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
45+
Scheduler.Worker worker = scheduler.createWorker();
46+
child.add(worker);
47+
48+
final BufferSubscriber<T> bs = new BufferSubscriber<T>(child, size, time, unit, worker);
49+
50+
child.add(bs);
51+
52+
child.setProducer(new BufferProducer<T>(bs));
53+
54+
return bs;
55+
}
56+
57+
/** The buffering subscriber for the upstream. */
58+
static final class BufferSubscriber<T> extends Subscriber<T> {
59+
60+
final Subscriber<? super List<T>> child;
61+
final int size;
62+
final long time;
63+
final TimeUnit unit;
64+
final Worker worker;
65+
66+
/** The producer of the upstream. */
67+
Producer producer;
68+
69+
/** Tracks the downstream requested amounts. */
70+
final AtomicLong requested;
71+
72+
/** Tracks the upstream requested amounts. */
73+
final AtomicLong upstreamRequested;
74+
75+
/** Holds onto the current timer. */
76+
final SerialSubscription timer;
77+
78+
/** The buffer holding the elements or null for a replaced buffer. Guarded by this. */
79+
List<T> buffer;
80+
81+
/** The current buffer identifier so timer doesn't emit an old buffer. Guarded by this. */
82+
long bufferId;
83+
84+
/** Captures how much time was remaining in the last timeout, in milliseconds. Guarded by this. */
85+
long timeRemaining;
86+
/** Stores the Worker.now()-relative value where the timer should fire, in milliseconds. Guarded by this. */
87+
long timeScheduled;
88+
89+
public BufferSubscriber(Subscriber<? super List<T>> child, int size,
90+
long time, TimeUnit unit, Worker worker) {
91+
this.child = child;
92+
this.size = size;
93+
this.time = time;
94+
this.unit = unit;
95+
this.worker = worker;
96+
this.timeRemaining = unit.toMillis(time);
97+
this.requested = new AtomicLong();
98+
this.upstreamRequested = new AtomicLong();
99+
this.timer = new SerialSubscription();
100+
this.add(timer);
101+
}
102+
103+
@Override
104+
public void setProducer(Producer producer) {
105+
this.producer = producer;
106+
}
107+
108+
@Override
109+
public void onNext(T t) {
110+
long ur = upstreamRequested.get();
111+
if (ur == 0) {
112+
onError(new MissingBackpressureException());
113+
return;
114+
} else
115+
if (ur != Long.MAX_VALUE) {
116+
upstreamRequested.decrementAndGet();
117+
}
118+
119+
List<T> list;
120+
long r;
121+
long id;
122+
long delay;
123+
synchronized (this) {
124+
List<T> b = buffer;
125+
if (b == null) {
126+
b = new ArrayList<T>();
127+
buffer = b;
128+
}
129+
b.add(t);
130+
if (b.size() == size) {
131+
id = ++bufferId;
132+
133+
list = buffer;
134+
buffer = null;
135+
136+
r = requested.get();
137+
if (r != Long.MAX_VALUE) {
138+
delay = calculateNextDelay();
139+
r = requested.decrementAndGet();
140+
} else {
141+
delay = -1; // irrelevant in unbounded mode
142+
}
143+
} else {
144+
return;
145+
}
146+
}
147+
scheduleTimer(r, id, delay);
148+
child.onNext(list);
149+
}
150+
151+
/** Timeout when run in backpressure mode. */
152+
void timeout(long id) {
153+
List<T> list;
154+
long r;
155+
long delay;
156+
synchronized (this) {
157+
if (id == bufferId) {
158+
list = buffer;
159+
buffer = null;
160+
161+
id = ++bufferId;
162+
163+
if (list == null) {
164+
list = new ArrayList<T>();
165+
}
166+
r = requested.get();
167+
if (r != Long.MAX_VALUE) {
168+
delay = calculateNextDelay();
169+
r = requested.decrementAndGet();
170+
} else {
171+
delay = -1; // irrelevant in unbounded mode
172+
}
173+
} else {
174+
return;
175+
}
176+
}
177+
scheduleTimer(r, id, delay);
178+
child.onNext(list);
179+
}
180+
/** Timeout in unbounded mode. */
181+
void timeout() {
182+
List<T> list;
183+
synchronized (this) {
184+
list = buffer;
185+
buffer = null;
186+
187+
++bufferId;
188+
189+
if (list == null) {
190+
list = new ArrayList<T>();
191+
}
192+
}
193+
child.onNext(list);
194+
}
195+
196+
void scheduleTimer(long r, long id, long delay) {
197+
if (r > 0 && r < Long.MAX_VALUE) {
198+
timer.set(worker.schedule(new TimerAction(id), delay, unit));
199+
}
200+
}
201+
202+
/** Calculates the next delay in the unit accounting how much time was left from the previous timer. */
203+
long calculateNextDelay() {
204+
long delay = timeScheduled - worker.now();
205+
if (delay <= 0) {
206+
delay = time;
207+
timeScheduled = worker.now() + unit.toMillis(time);
208+
} else {
209+
timeScheduled = worker.now() + delay;
210+
delay = unit.convert(delay, TimeUnit.MILLISECONDS);
211+
}
212+
return delay;
213+
}
214+
215+
@Override
216+
public void onError(Throwable e) {
217+
timer.unsubscribe();
218+
try {
219+
synchronized (this) {
220+
buffer = null;
221+
bufferId++;
222+
}
223+
requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request
224+
225+
child.onError(e);
226+
} finally {
227+
unsubscribe();
228+
}
229+
}
230+
231+
@Override
232+
public void onCompleted() {
233+
timer.unsubscribe();
234+
try {
235+
// either we win and emit the current buffer or the timer in which case
236+
// there is no point in emitting an empty buffer
237+
List<T> list;
238+
synchronized (this) {
239+
list = buffer;
240+
bufferId++;
241+
}
242+
requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request
243+
if (list != null) {
244+
try {
245+
child.onNext(list);
246+
} catch (Throwable t) {
247+
Exceptions.throwIfFatal(t);
248+
child.onError(t);
249+
return;
250+
}
251+
}
252+
child.onCompleted();
253+
} finally {
254+
unsubscribe();
255+
}
256+
}
257+
public void downstreamRequest(long n) {
258+
if (n < 0) {
259+
throw new IllegalArgumentException("Request is negative");
260+
}
261+
if (n == 0) {
262+
return;
263+
}
264+
for (;;) {
265+
long r = requested.get();
266+
if (r < 0) {
267+
return;
268+
}
269+
long u = r + n;
270+
if (u < 0) {
271+
u = Long.MAX_VALUE;
272+
}
273+
if (requested.compareAndSet(r, u)) {
274+
handleRequested(r, n);
275+
return;
276+
}
277+
}
278+
}
279+
/**
280+
* Handles the change in the request amount.
281+
* @param before the value before the request
282+
* @param request the requested amount
283+
*/
284+
void handleRequested(long before, long request) {
285+
long s = size;
286+
long elements = request * s;
287+
// s != 0 and request != 0
288+
if ((request >>> 31) != 0 && (elements / request != s)) {
289+
elements = Long.MAX_VALUE;
290+
}
291+
if (before == 0) {
292+
if (request != Long.MAX_VALUE) {
293+
long id;
294+
long delay;
295+
296+
synchronized (this) {
297+
id = bufferId;
298+
delay = calculateNextDelay();
299+
}
300+
301+
timer.set(worker.schedule(new TimerAction(id), delay, unit));
302+
} else {
303+
timer.set(worker.schedulePeriodically(new PeriodicAction(), time, time, unit));
304+
}
305+
}
306+
for (;;) {
307+
long r2 = upstreamRequested.get();
308+
long u2 = r2 + elements;
309+
if (u2 < 0) {
310+
u2 = Long.MAX_VALUE;
311+
}
312+
if (upstreamRequested.compareAndSet(r2, u2)) {
313+
break;
314+
}
315+
}
316+
317+
Producer p = producer;
318+
if (p != null) {
319+
p.request(elements);
320+
}
321+
}
322+
/**
323+
* The timer action trying to emit the buffer contents.
324+
*/
325+
class TimerAction implements Action0 {
326+
final long id;
327+
private TimerAction(long id) {
328+
this.id = id;
329+
}
330+
@Override
331+
public void call() {
332+
timeout(id);
333+
}
334+
}
335+
/**
336+
* The timer action trying to emit the buffer contents.
337+
*/
338+
class PeriodicAction implements Action0 {
339+
@Override
340+
public void call() {
341+
timeout();
342+
}
343+
}
344+
}
345+
346+
/**
347+
* The producer forwarding request calls to a BufferSubscriber.
348+
*
349+
* @param <T> the emitted value type
350+
*/
351+
static final class BufferProducer<T> implements Producer {
352+
final BufferSubscriber<T> bs;
353+
public BufferProducer(BufferSubscriber<T> bs) {
354+
this.bs = bs;
355+
}
356+
@Override
357+
public void request(long n) {
358+
bs.downstreamRequest(n);
359+
}
360+
}
361+
}

0 commit comments

Comments
 (0)