Skip to content

Commit 6399270

Browse files
committed
Operator buffer(time) and buffer(time, size) now support backpressure.
1 parent 8758fdd commit 6399270

File tree

5 files changed

+643
-49
lines changed

5 files changed

+643
-49
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3289,7 +3289,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
32893289
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
32903290
*/
32913291
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
3292-
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, count, scheduler));
3292+
return lift(new OperatorBufferWithTimeAndSize<T>(timespan, unit, count, scheduler));
32933293
}
32943294

32953295
/**
@@ -3320,7 +3320,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
33203320
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
33213321
*/
33223322
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler) {
3323-
return buffer(timespan, timespan, unit, scheduler);
3323+
return buffer(timespan, unit, Integer.MAX_VALUE, scheduler);
33243324
}
33253325

33263326
/**
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import rx.Observable.Operator;
23+
import rx.*;
24+
import rx.Scheduler.Worker;
25+
import rx.exceptions.*;
26+
import rx.functions.Action0;
27+
import rx.subscriptions.*;
28+
29+
/**
30+
* Buffers the source into Lists with maximum size or emission duration, respecting backpressure.
31+
*/
32+
public final class OperatorBufferWithTimeAndSize<T> implements Operator<List<T>, T> {
33+
final int size;
34+
final long time;
35+
final TimeUnit unit;
36+
final Scheduler scheduler;
37+
public OperatorBufferWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler) {
38+
this.size = size;
39+
this.time = time;
40+
this.unit = unit;
41+
this.scheduler = scheduler;
42+
}
43+
@Override
44+
public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
45+
Scheduler.Worker worker = scheduler.createWorker();
46+
child.add(worker);
47+
48+
final BufferSubscriber<T> bs = new BufferSubscriber<T>(child, size, time, unit, worker);
49+
50+
child.add(bs);
51+
52+
child.setProducer(new BufferProducer<T>(bs));
53+
54+
return bs;
55+
}
56+
57+
/** The buffering subscriber for the upstream. */
58+
static final class BufferSubscriber<T> extends Subscriber<T> {
59+
60+
final Subscriber<? super List<T>> child;
61+
final int size;
62+
final long time;
63+
final TimeUnit unit;
64+
final Worker worker;
65+
66+
/** The producer of the upstream. */
67+
Producer producer;
68+
69+
/** Tracks the downstream requested amounts. */
70+
final AtomicLong requested;
71+
72+
/** Holds onto the current timer. */
73+
final SerialSubscription timer;
74+
75+
/** The buffer holding the elements or null for a replaced buffer. Guarded by this. */
76+
List<T> buffer;
77+
78+
/** The current buffer identifier so timer doesn't emit an old buffer. Guarded by this. */
79+
long bufferId;
80+
81+
/** Captures how much time was remaining in the last timeout, in milliseconds. Guarded by this. */
82+
long timeRemaining;
83+
/** Stores the Worker.now()-relative value where the timer should fire, in milliseconds. Guarded by this. */
84+
long timeScheduled;
85+
86+
public BufferSubscriber(Subscriber<? super List<T>> child, int size,
87+
long time, TimeUnit unit, Worker worker) {
88+
this.child = child;
89+
this.size = size;
90+
this.time = time;
91+
this.unit = unit;
92+
this.worker = worker;
93+
this.timeRemaining = unit.toMillis(time);
94+
this.requested = new AtomicLong();
95+
this.timer = new SerialSubscription();
96+
this.add(timer);
97+
}
98+
99+
@Override
100+
public void setProducer(Producer producer) {
101+
this.producer = producer;
102+
}
103+
104+
@Override
105+
public void onNext(T t) {
106+
if (requested.get() == 0) {
107+
onError(new MissingBackpressureException());
108+
return;
109+
}
110+
List<T> list;
111+
long r;
112+
long id;
113+
long delay;
114+
synchronized (this) {
115+
List<T> b = buffer;
116+
if (b == null) {
117+
b = new ArrayList<T>();
118+
buffer = b;
119+
}
120+
b.add(t);
121+
if (b.size() == size) {
122+
id = ++bufferId;
123+
124+
list = buffer;
125+
buffer = null;
126+
127+
r = requested.get();
128+
if (r != Long.MAX_VALUE) {
129+
delay = calculateNextDelay();
130+
r = requested.decrementAndGet();
131+
} else {
132+
delay = -1; // irrelevant in unbounded mode
133+
}
134+
} else {
135+
return;
136+
}
137+
}
138+
scheduleTimer(r, id, delay);
139+
child.onNext(list);
140+
}
141+
142+
/** Timeout when run in backpressure mode. */
143+
void timeout(long id) {
144+
List<T> list;
145+
long r;
146+
long delay;
147+
synchronized (this) {
148+
if (id == bufferId) {
149+
list = buffer;
150+
buffer = null;
151+
152+
id = ++bufferId;
153+
154+
if (list == null) {
155+
list = new ArrayList<T>();
156+
}
157+
r = requested.get();
158+
if (r != Long.MAX_VALUE) {
159+
delay = calculateNextDelay();
160+
r = requested.decrementAndGet();
161+
} else {
162+
delay = -1; // irrelevant in unbounded mode
163+
}
164+
} else {
165+
return;
166+
}
167+
}
168+
scheduleTimer(r, id, delay);
169+
child.onNext(list);
170+
}
171+
/** Timeout in unbounded mode. */
172+
void timeout() {
173+
List<T> list;
174+
synchronized (this) {
175+
list = buffer;
176+
buffer = null;
177+
178+
++bufferId;
179+
180+
if (list == null) {
181+
list = new ArrayList<T>();
182+
}
183+
}
184+
child.onNext(list);
185+
}
186+
187+
void scheduleTimer(long r, long id, long delay) {
188+
if (r > 0 && r < Long.MAX_VALUE) {
189+
timer.set(worker.schedule(new TimerAction(id), delay, unit));
190+
} else
191+
if (r == 0) {
192+
timer.set(Subscriptions.unsubscribed());
193+
}
194+
}
195+
196+
/** Calculates the next delay in the unit accounting how much time was left from the previous timer. */
197+
long calculateNextDelay() {
198+
long delay = timeScheduled - worker.now();
199+
if (delay <= 0) {
200+
delay = time;
201+
timeScheduled = worker.now() + unit.toMillis(time);
202+
} else {
203+
timeScheduled = worker.now() + delay;
204+
delay = unit.convert(delay, TimeUnit.MILLISECONDS);
205+
}
206+
return delay;
207+
}
208+
209+
@Override
210+
public void onError(Throwable e) {
211+
timer.unsubscribe();
212+
try {
213+
synchronized (this) {
214+
buffer = null;
215+
bufferId++;
216+
}
217+
requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request
218+
219+
child.onError(e);
220+
} finally {
221+
unsubscribe();
222+
}
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
timer.unsubscribe();
228+
try {
229+
// either we win and emit the current buffer or the timer in which case
230+
// there is no point in emitting an empty buffer
231+
List<T> list;
232+
synchronized (this) {
233+
list = buffer;
234+
bufferId++;
235+
}
236+
requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request
237+
if (list != null) {
238+
try {
239+
child.onNext(list);
240+
} catch (Throwable t) {
241+
Exceptions.throwIfFatal(t);
242+
child.onError(t);
243+
return;
244+
}
245+
}
246+
child.onCompleted();
247+
} finally {
248+
unsubscribe();
249+
}
250+
}
251+
public void downstreamRequest(long n) {
252+
if (n < 0) {
253+
throw new IllegalArgumentException("Request is negative");
254+
}
255+
if (n == 0) {
256+
return;
257+
}
258+
for (;;) {
259+
long r = requested.get();
260+
if (r < 0) {
261+
return;
262+
}
263+
long u = r + n;
264+
if (u < 0) {
265+
u = Long.MAX_VALUE;
266+
}
267+
if (requested.compareAndSet(r, u)) {
268+
handleRequested(r, n);
269+
return;
270+
}
271+
}
272+
}
273+
/**
274+
* Handles the change in the request amount.
275+
* @param before the value before the request
276+
* @param request the requested amount
277+
*/
278+
void handleRequested(long before, long request) {
279+
long s = size;
280+
long elements = request * s;
281+
// s != 0 and request != 0
282+
if ((request >>> 31) != 0 && (elements / request != s)) {
283+
elements = Long.MAX_VALUE;
284+
}
285+
if (before == 0) {
286+
if (request != Long.MAX_VALUE) {
287+
long id;
288+
long delay;
289+
290+
synchronized (this) {
291+
id = bufferId;
292+
delay = calculateNextDelay();
293+
}
294+
295+
timer.set(worker.schedule(new TimerAction(id), delay, unit));
296+
} else {
297+
timer.set(worker.schedulePeriodically(new PeriodicAction(), time, time, unit));
298+
}
299+
}
300+
Producer p = producer;
301+
if (p != null) {
302+
p.request(elements);
303+
}
304+
}
305+
/**
306+
* The timer action trying to emit the buffer contents.
307+
*/
308+
class TimerAction implements Action0 {
309+
final long id;
310+
private TimerAction(long id) {
311+
this.id = id;
312+
}
313+
@Override
314+
public void call() {
315+
timeout(id);
316+
}
317+
}
318+
/**
319+
* The timer action trying to emit the buffer contents.
320+
*/
321+
class PeriodicAction implements Action0 {
322+
@Override
323+
public void call() {
324+
timeout();
325+
}
326+
}
327+
}
328+
329+
/**
330+
* The producer forwarding request calls to a BufferSubscriber.
331+
*
332+
* @param <T> the emitted value type
333+
*/
334+
static final class BufferProducer<T> implements Producer {
335+
final BufferSubscriber<T> bs;
336+
public BufferProducer(BufferSubscriber<T> bs) {
337+
this.bs = bs;
338+
}
339+
@Override
340+
public void request(long n) {
341+
bs.downstreamRequest(n);
342+
}
343+
}
344+
}

0 commit comments

Comments
 (0)