Skip to content

ChainedSubscription -> SubscriptionList #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -32,9 +32,9 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final ChainedSubscription cs;
private final SubscriptionList cs;

protected Subscriber(ChainedSubscription cs) {
protected Subscriber(SubscriptionList cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
Expand All @@ -43,12 +43,12 @@ protected Subscriber(ChainedSubscription cs) {

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
this(new SubscriptionList());
add(cs);
}

protected Subscriber() {
this(new ChainedSubscription());
this(new SubscriptionList());
}

protected Subscriber(Subscriber<?> op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -55,7 +55,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new ChainedSubscription());
super(new SubscriptionList());
this.keySelector = keySelector;
this.child = child;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new SubscriptionList(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand Down Expand Up @@ -65,12 +65,12 @@ private static final class PivotSubscriber<K1, K2, T> extends Subscriber<Grouped
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final ChainedSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
Expand Down Expand Up @@ -158,7 +158,7 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final ChainedSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
Expand All @@ -167,7 +167,7 @@ private static final class GroupState<K1, K2, T> {
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final ChainedSubscription parent = new ChainedSubscription();
final SubscriptionList parent = new SubscriptionList();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final ChainedSubscription parentSubscription = new ChainedSubscription();
final SubscriptionList parentSubscription = new SubscriptionList();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {
public final class SubscriptionList implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;

public ChainedSubscription() {
public SubscriptionList() {
}

public ChainedSubscription(final Subscription... subscriptions) {
public SubscriptionList(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

Expand All @@ -47,8 +47,8 @@ public synchronized boolean isUnsubscribed() {
}

/**
* Adds a new {@link Subscription} to this {@code ChainedSubscription} if the {@code ChainedSubscription} is
* not yet unsubscribed. If the {@code ChainedSubscription} <em>is</em> unsubscribed, {@code add} will
* Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is
* not yet unsubscribed. If the {@code SubscriptionList} <em>is</em> unsubscribed, {@code add} will
* indicate this by explicitly unsubscribing the new {@code Subscription} as well.
*
* @param s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import rx.Subscription;
import rx.exceptions.CompositeException;

public class ChainedSubscriptionTest {
public class SubscriptionListTest {

@Test
public void testSuccess() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -70,7 +70,7 @@ public boolean isUnsubscribed() {
@Test(timeout = 1000)
public void shouldUnsubscribeAll() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down Expand Up @@ -117,7 +117,7 @@ public void run() {
@Test
public void testException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -159,7 +159,7 @@ public boolean isUnsubscribed() {
@Test
public void testCompositeException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -215,7 +215,7 @@ public boolean isUnsubscribed() {
@Test
public void testUnsubscribeIdempotence() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand All @@ -241,7 +241,7 @@ public boolean isUnsubscribed() {
public void testUnsubscribeIdempotenceConcurrently()
throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down