Skip to content

GroupedObservable.from/create #1789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void onNext(T t) {
private GroupState<K, T> createNewGroup(final K key) {
final GroupState<K, T> groupState = new GroupState<K, T>();

GroupedObservable<K, R> go = new GroupedObservable<K, R>(key, new OnSubscribe<R>() {
GroupedObservable<K, R> go = GroupedObservable.create(key, new OnSubscribe<R>() {

@Override
public void call(final Subscriber<? super R> o) {
Expand Down
62 changes: 57 additions & 5 deletions src/main/java/rx/observables/GroupedObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
package rx.observables;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;

/**
* An {@link Observable} that has been grouped by key, the value of which can be obtained with
* {@link #getKey()}.
* An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}.
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they
* may
* discard their buffers by applying an operator like {@link Observable#take}{@code (0)} to them.
*
* @param <K>
Expand All @@ -37,7 +38,58 @@
public class GroupedObservable<K, T> extends Observable<T> {
private final K key;

public GroupedObservable(K key, OnSubscribe<T> onSubscribe) {
/**
* Converts an {@link Observable} into a {@code GroupedObservable} with a particular key.
*
* @param key
* the key to identify the group of items emitted by this {@code GroupedObservable}
* @param o
* the {@link Observable} to convert
* @return a {@code GroupedObservable} representation of {@code o}, with key {@code key}
*/
public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o) {
return new GroupedObservable<K, T>(key, new OnSubscribe<T>() {

@Override
public void call(Subscriber<? super T> s) {
o.unsafeSubscribe(s);
}
});
}

/**
* Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
* it.
* <p>
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="">
* <p>
* Write the function you pass to {@code create} so that it behaves as an Observable: It should invoke the
* Subscriber's {@link Subscriber#onNext onNext}, {@link Subscriber#onError onError}, and {@link Subscriber#onCompleted onCompleted} methods appropriately.
* <p>
* A well-formed Observable must invoke either the Subscriber's {@code onCompleted} method exactly once or
* its {@code onError} method exactly once.
* <p>
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
* information.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <K>
* the type of the key
* @param <T>
* the type of the items that this Observable emits
* @param f
* a function that accepts an {@code Subscriber<T>}, and invokes its {@code onNext}, {@code onError}, and {@code onCompleted} methods as appropriate
* @return a GroupedObservable that, when a {@link Subscriber} subscribes to it, will execute the specified
* function
*/
public final static <K, T> GroupedObservable<K, T> create(K key, OnSubscribe<T> f) {
return new GroupedObservable<K, T>(key, f);
}

protected GroupedObservable(K key, OnSubscribe<T> onSubscribe) {
super(onSubscribe);
this.key = key;
}
Expand Down