Skip to content

Commit d86bc3b

Browse files
committed
Add an operator to throttle data via controlling the requests going upstream.
1 parent ab6dbc1 commit d86bc3b

File tree

3 files changed

+420
-33
lines changed

3 files changed

+420
-33
lines changed

src/main/java/rx/Observable.java

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,8 @@ static final class OnSubscribeExtend<T> implements OnSubscribe<T> {
208208
OnSubscribeExtend(Observable<T> parent) {
209209
this.parent = parent;
210210
}
211-
@Override
212-
public void call(Subscriber<? super T> subscriber) {
211+
@Override
212+
public void call(Subscriber<? super T> subscriber) {
213213
subscriber.add(subscribe(subscriber, parent));
214214
}
215215
}
@@ -239,7 +239,7 @@ public void call(Subscriber<? super T> subscriber) {
239239
*/
240240
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
241241
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
242-
}
242+
}
243243

244244
/**
245245
* Transform an Observable by applying a particular Transformer function to it.
@@ -2807,8 +2807,8 @@ public static Observable<Integer> range(int start, int count, Scheduler schedule
28072807
*/
28082808
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) {
28092809
return sequenceEqual(first, second, InternalObservableUtils.OBJECT_EQUALS);
2810-
}
2811-
2810+
}
2811+
28122812
/**
28132813
* Returns an Observable that emits a Boolean value that indicates whether two Observable sequences are the
28142814
* same by comparing the items emitted by each Observable pairwise based on the results of a specified
@@ -3132,7 +3132,7 @@ public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<
31323132
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
31333133
return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip<R>(zipFunction));
31343134
}
3135-
3135+
31363136
/**
31373137
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
31383138
* two items emitted, in sequence, by two other Observables.
@@ -3994,7 +3994,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
39943994
*/
39953995
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
39963996
Func2<R, T, R> accumulator = InternalObservableUtils.createCollectorCaller(collector);
3997-
3997+
39983998
/*
39993999
* Discussion and confirmation of implementation at
40004000
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
@@ -4117,7 +4117,7 @@ public final Observable<T> concatWith(Observable<? extends T> t1) {
41174117
*/
41184118
public final Observable<Boolean> contains(final Object element) {
41194119
return exists(InternalObservableUtils.equalsWith(element));
4120-
}
4120+
}
41214121

41224122
/**
41234123
* Returns an Observable that emits the count of the total number of items emitted by the source Observable.
@@ -4782,7 +4782,7 @@ public final Observable<T> doOnSubscribe(final Action0 subscribe) {
47824782
public final Observable<T> doOnTerminate(final Action0 onTerminate) {
47834783
Action1<T> onNext = Actions.empty();
47844784
Action1<Throwable> onError = Actions.toAction1(onTerminate);
4785-
4785+
47864786
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onTerminate);
47874787

47884788
return lift(new OperatorDoOnEach<T>(observer));
@@ -6698,6 +6698,26 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
66986698
return OperatorPublish.create(this, selector);
66996699
}
67006700

6701+
/**
6702+
* Allow the an external signal control the amount of data being set through this Observable chain.
6703+
* When the control Observable emits false (closes the valve) requests upstream are stopped and any
6704+
* requests from downstream for more data are buffered until the control Observable emits a true
6705+
* (opens the valve). Should the control Observable error or complete while closed (last control
6706+
* emition was a false) an error is sent down the data stream. The granularity breaks up large requests
6707+
* from downstream to limit the number of onNexts that are possible after the control valve has closed.
6708+
* The smaller the number the tighter the control on the flow but the more overhead there will be in
6709+
* managing the requests.
6710+
*
6711+
* @param control
6712+
* an Observable that dictates if request signals propagate upstream
6713+
* @param granularity
6714+
* the maximum number of outstanding requests.
6715+
* @returns an Observable that mostly stops emiting after the control Observable emits a false.
6716+
*/
6717+
public final Observable<T> pressureValve(Observable<Boolean> control, long granularity) {
6718+
return lift(new OperatorValve<T>(control, granularity));
6719+
}
6720+
67016721
/**
67026722
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
67036723
* Observable, then feeds the result of that function along with the second item emitted by the source
@@ -6874,7 +6894,7 @@ public final Observable<T> repeat(final long count, Scheduler scheduler) {
68746894
*/
68756895
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
68766896
return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler), scheduler);
6877-
}
6897+
}
68786898

68796899
/**
68806900
* Returns an Observable that emits the same values as the source Observable with the exception of an
@@ -6897,7 +6917,7 @@ public final Observable<T> repeatWhen(final Func1<? super Observable<? extends V
68976917
*/
68986918
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
68996919
return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler));
6900-
}
6920+
}
69016921

69026922
/**
69036923
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying Observable
@@ -6948,7 +6968,7 @@ public final ConnectableObservable<T> replay() {
69486968
*/
69496969
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
69506970
return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this), selector);
6951-
}
6971+
}
69526972

69536973
/**
69546974
* Returns an Observable that emits items that are the results of invoking a specified selector on items
@@ -7059,7 +7079,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
70597079
}
70607080
return OperatorReplay.multicastSelector(
70617081
InternalObservableUtils.createReplaySupplier(this, bufferSize, time, unit, scheduler), selector);
7062-
}
7082+
}
70637083

70647084
/**
70657085
* Returns an Observable that emits items that are the results of invoking a specified selector on items
@@ -7093,7 +7113,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
70937113
public final <R> Observable<R> replay(final Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
70947114
return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this, bufferSize),
70957115
InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler));
7096-
}
7116+
}
70977117

70987118
/**
70997119
* Returns an Observable that emits items that are the results of invoking a specified selector on items
@@ -7162,7 +7182,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
71627182
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
71637183
return OperatorReplay.multicastSelector(
71647184
InternalObservableUtils.createReplaySupplier(this, time, unit, scheduler), selector);
7165-
}
7185+
}
71667186

71677187
/**
71687188
* Returns an Observable that emits items that are the results of invoking a specified selector on items
@@ -7194,7 +7214,7 @@ public final <R> Observable<R> replay(final Func1<? super Observable<T>, ? exten
71947214
return OperatorReplay.multicastSelector(
71957215
InternalObservableUtils.createReplaySupplier(this),
71967216
InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler));
7197-
}
7217+
}
71987218

71997219
/**
72007220
* Returns a {@link ConnectableObservable} that shares a single subscription to the source Observable that
@@ -7530,7 +7550,7 @@ public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
75307550
*/
75317551
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
75327552
return OnSubscribeRedo.<T>retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler));
7533-
}
7553+
}
75347554

75357555
/**
75367556
* Returns an Observable that emits the same values as the source observable with the exception of an
@@ -7557,7 +7577,7 @@ public final Observable<T> retryWhen(final Func1<? super Observable<? extends Th
75577577
*/
75587578
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
75597579
return OnSubscribeRedo.<T> retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler), scheduler);
7560-
}
7580+
}
75617581

75627582
/**
75637583
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
@@ -8361,7 +8381,7 @@ public final Subscription subscribe(final Action1<? super T> onNext) {
83618381
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
83628382
Action0 onCompleted = Actions.empty();
83638383
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
8364-
}
8384+
}
83658385

83668386
/**
83678387
* Subscribes to an Observable and provides callbacks to handle the items it emits and any error
@@ -8393,7 +8413,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio
83938413

83948414
Action0 onCompleted = Actions.empty();
83958415
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
8396-
}
8416+
}
83978417

83988418
/**
83998419
* Subscribes to an Observable and provides callbacks to handle the items it emits and any error or
@@ -8452,7 +8472,7 @@ public final Subscription subscribe(final Observer<? super T> observer) {
84528472
return subscribe((Subscriber<? super T>)observer);
84538473
}
84548474
return subscribe(new ObserverSubscriber<T>(observer));
8455-
}
8475+
}
84568476

84578477
/**
84588478
* Subscribes to an Observable and invokes {@link OnSubscribe} function without any contract protection,
@@ -8579,19 +8599,19 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
85798599
if (subscriber.isUnsubscribed()) {
85808600
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
85818601
} else {
8582-
// if an unhandled error occurs executing the onSubscribe we will propagate it
8583-
try {
8584-
subscriber.onError(hook.onSubscribeError(e));
8585-
} catch (Throwable e2) {
8586-
Exceptions.throwIfFatal(e2);
8587-
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
8588-
// so we are unable to propagate the error correctly and will just throw
8602+
// if an unhandled error occurs executing the onSubscribe we will propagate it
8603+
try {
8604+
subscriber.onError(hook.onSubscribeError(e));
8605+
} catch (Throwable e2) {
8606+
Exceptions.throwIfFatal(e2);
8607+
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
8608+
// so we are unable to propagate the error correctly and will just throw
85898609
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
8590-
// TODO could the hook be the cause of the error in the on error handling.
8591-
hook.onSubscribeError(r);
8592-
// TODO why aren't we throwing the hook's return value.
8593-
throw r;
8594-
}
8610+
// TODO could the hook be the cause of the error in the on error handling.
8611+
hook.onSubscribeError(r);
8612+
// TODO why aren't we throwing the hook's return value.
8613+
throw r;
8614+
}
85958615
}
85968616
return Subscriptions.unsubscribed();
85978617
}

0 commit comments

Comments
 (0)