-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: Add vararg of Subscriptions to CompositeSubscription. #3720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,20 +15,19 @@ | |
*/ | ||
package rx.subscriptions; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.Assert.fail; | ||
import org.junit.Test; | ||
import rx.Subscription; | ||
import rx.exceptions.CompositeException; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import org.junit.Test; | ||
|
||
import rx.Subscription; | ||
import rx.exceptions.CompositeException; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.Assert.fail; | ||
|
||
public class CompositeSubscriptionTest { | ||
|
||
|
@@ -324,17 +323,18 @@ public void run() { | |
// we should have only unsubscribed once | ||
assertEquals(1, counter.get()); | ||
} | ||
|
||
@Test | ||
public void testTryRemoveIfNotIn() { | ||
CompositeSubscription csub = new CompositeSubscription(); | ||
|
||
CompositeSubscription csub1 = new CompositeSubscription(); | ||
CompositeSubscription csub2 = new CompositeSubscription(); | ||
|
||
csub.add(csub1); | ||
csub.remove(csub1); | ||
csub.add(csub2); | ||
|
||
csub.remove(csub1); // try removing again | ||
} | ||
|
||
|
@@ -344,4 +344,121 @@ public void testAddingNullSubscriptionIllegal() { | |
csub.add(null); | ||
} | ||
|
||
@Test | ||
public void testAddAll() { | ||
final AtomicInteger counter = new AtomicInteger(); | ||
CompositeSubscription s = new CompositeSubscription(); | ||
s.addAll(new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure if it's correct behavior to always return |
||
} | ||
}, new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}, new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}, new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}); | ||
|
||
s.unsubscribe(); | ||
|
||
assertEquals(4, counter.get()); | ||
} | ||
|
||
@Test(timeout = 1000) | ||
public void testAddAllConcurrent() throws InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this test since it gives no exact guarantees about thread-safety of the target method, depends on the test environment. Also, current timeout may give flakiness on Travis (sometimes it has significant delays in threads scheduling) |
||
final AtomicInteger counter = new AtomicInteger(); | ||
final CompositeSubscription s = new CompositeSubscription(); | ||
|
||
final int count = 10; | ||
final CountDownLatch start = new CountDownLatch(1); | ||
final CountDownLatch end = new CountDownLatch(10); | ||
final List<Thread> threads = new ArrayList<Thread>(); | ||
for (int i = 0; i < count; i++) { | ||
final Thread t = new Thread() { | ||
@Override | ||
public void run() { | ||
try { | ||
start.await(); | ||
s.addAll(new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}, new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}, new Subscription() { | ||
@Override | ||
public void unsubscribe() { | ||
counter.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public boolean isUnsubscribed() { | ||
return false; | ||
} | ||
}); | ||
end.countDown(); | ||
} catch (final InterruptedException e) { | ||
fail(e.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will throw exception on non-main thread and it won't fail the test. |
||
} | ||
} | ||
}; | ||
t.start(); | ||
threads.add(t); | ||
} | ||
|
||
start.countDown(); | ||
end.await(); | ||
s.unsubscribe(); | ||
for (final Thread t : threads) { | ||
t.join(); | ||
} | ||
|
||
assertEquals(30, counter.get()); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: test prefix is not needed, same for test below