Skip to content

GroupBy/GroupByUntil Changes #1727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 0 additions & 89 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4708,15 +4708,6 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
Expand Down Expand Up @@ -4746,15 +4737,6 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
Expand All @@ -4770,77 +4752,6 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified key selector function until
* the duration {@code Observable} expires for the key.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function to extract the key for each item
* @param durationSelector
* a function to signal the expiration of a group
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
* value and each of which emits all items emitted by the source {@code Observable} during that
* key's duration that share that same key value
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
*/
public final <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<? extends TDuration>> durationSelector) {
return groupByUntil(keySelector, Functions.<T> identity(), durationSelector);
}

/**
* Groups the items emitted by an {@code Observable} (transformed by a selector) according to a specified
* key selector function until the duration Observable expires for the key.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
* observable" and blocking any one group would block the entire parent stream. If you need
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
* or {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function to extract the key for each item
* @param valueSelector
* a function to map each item emitted by the source {@code Observable} to an item emitted by one
* of the resulting {@link GroupedObservable}s
* @param durationSelector
* a function to signal the expiration of a group
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
* value and each of which emits all items emitted by the source {@code Observable} during that
* key's duration that share that same key value, transformed by the value selector
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
*/
public final <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<? extends TDuration>> durationSelector) {
return lift(new OperatorGroupByUntil<T, TKey, TValue, TDuration>(keySelector, valueSelector, durationSelector));
}

/**
* Returns an Observable that correlates two Observables when they overlap in time and groups the results.
* <p>
Expand Down
Loading