Skip to content

1.x: Add vararg of Subscriptions to CompositeSubscription. #3720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package rx.subscriptions;

import rx.Subscription;
import rx.exceptions.Exceptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import rx.Subscription;
import rx.exceptions.*;

/**
* Subscription that represents a group of Subscriptions that are unsubscribed together.
* <p>
Expand Down Expand Up @@ -54,7 +54,7 @@ public boolean isUnsubscribed() {
* well.
*
* @param s
* the {@link Subscription} to add
* the {@link Subscription} to add
*/
public void add(final Subscription s) {
if (s.isUnsubscribed()) {
Expand All @@ -75,12 +75,44 @@ public void add(final Subscription s) {
s.unsubscribe();
}

/**
* Adds collection of {@link Subscription} to this {@code CompositeSubscription} if the
* {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
* unsubscribed, {@code addAll} will indicate this by explicitly unsubscribing all {@code Subscription} in collection as
* well.
*
* @param subscriptions
* the collection of {@link Subscription} to add
*/
public void addAll(final Subscription... subscriptions) {
if (!unsubscribed) {
synchronized (this) {
if (!unsubscribed) {
if (this.subscriptions == null) {
this.subscriptions = new HashSet<Subscription>(subscriptions.length);
}

for (Subscription s : subscriptions) {
if (!s.isUnsubscribed()) {
this.subscriptions.add(s);
}
}
return;
}
}
}

for (Subscription s : subscriptions) {
s.unsubscribe();
}
}

/**
* Removes a {@link Subscription} from this {@code CompositeSubscription}, and unsubscribes the
* {@link Subscription}.
*
* @param s
* the {@link Subscription} to remove
* the {@link Subscription} to remove
*/
public void remove(final Subscription s) {
if (!unsubscribed) {
Expand Down
139 changes: 128 additions & 11 deletions src/test/java/rx/subscriptions/CompositeSubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
*/
package rx.subscriptions;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import rx.Subscription;
import rx.exceptions.CompositeException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Subscription;
import rx.exceptions.CompositeException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CompositeSubscriptionTest {

Expand Down Expand Up @@ -324,17 +323,18 @@ public void run() {
// we should have only unsubscribed once
assertEquals(1, counter.get());
}

@Test
public void testTryRemoveIfNotIn() {
CompositeSubscription csub = new CompositeSubscription();

CompositeSubscription csub1 = new CompositeSubscription();
CompositeSubscription csub2 = new CompositeSubscription();

csub.add(csub1);
csub.remove(csub1);
csub.add(csub2);

csub.remove(csub1); // try removing again
}

Expand All @@ -344,4 +344,121 @@ public void testAddingNullSubscriptionIllegal() {
csub.add(null);
}

@Test
public void testAddAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: test prefix is not needed, same for test below

final AtomicInteger counter = new AtomicInteger();
CompositeSubscription s = new CompositeSubscription();
s.addAll(new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it's correct behavior to always return false here. Maybe create TestSubscription with AtomicBoolean unsubscribed?

}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
});

s.unsubscribe();

assertEquals(4, counter.get());
}

@Test(timeout = 1000)
public void testAddAllConcurrent() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this test since it gives no exact guarantees about thread-safety of the target method, depends on the test environment.

Also, current timeout may give flakiness on Travis (sometimes it has significant delays in threads scheduling)

final AtomicInteger counter = new AtomicInteger();
final CompositeSubscription s = new CompositeSubscription();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(10);
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
public void run() {
try {
start.await();
s.addAll(new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
});
end.countDown();
} catch (final InterruptedException e) {
fail(e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw exception on non-main thread and it won't fail the test.

}
}
};
t.start();
threads.add(t);
}

start.countDown();
end.await();
s.unsubscribe();
for (final Thread t : threads) {
t.join();
}

assertEquals(30, counter.get());
}

}