Skip to content

Commit 114f50c

Browse files
Klemen KresnikKlemen Kresnik
Klemen Kresnik
authored and
Klemen Kresnik
committed
Add vararg of Subscriptions to composite subscription.
1 parent c8e1b03 commit 114f50c

File tree

2 files changed

+165
-16
lines changed

2 files changed

+165
-16
lines changed

src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import rx.Subscription;
19+
import rx.exceptions.Exceptions;
20+
1821
import java.util.ArrayList;
1922
import java.util.Arrays;
2023
import java.util.Collection;
2124
import java.util.HashSet;
2225
import java.util.List;
2326
import java.util.Set;
2427

25-
import rx.Subscription;
26-
import rx.exceptions.*;
27-
2828
/**
2929
* Subscription that represents a group of Subscriptions that are unsubscribed together.
3030
* <p>
@@ -54,7 +54,7 @@ public boolean isUnsubscribed() {
5454
* well.
5555
*
5656
* @param s
57-
* the {@link Subscription} to add
57+
* the {@link Subscription} to add
5858
*/
5959
public void add(final Subscription s) {
6060
if (s.isUnsubscribed()) {
@@ -75,12 +75,44 @@ public void add(final Subscription s) {
7575
s.unsubscribe();
7676
}
7777

78+
/**
79+
* Adds collection of {@link Subscription} to this {@code CompositeSubscription} if the
80+
* {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
81+
* unsubscribed, {@code addAll} will indicate this by explicitly unsubscribing all {@code Subscription} in collection as
82+
* well.
83+
*
84+
* @param subscriptions
85+
* the collection of {@link Subscription} to add
86+
*/
87+
public void addAll(final Subscription... subscriptions) {
88+
if (!unsubscribed) {
89+
synchronized (this) {
90+
if (!unsubscribed) {
91+
if (this.subscriptions == null) {
92+
this.subscriptions = new HashSet<Subscription>(subscriptions.length);
93+
}
94+
95+
for (Subscription s : subscriptions) {
96+
if (!s.isUnsubscribed()) {
97+
this.subscriptions.add(s);
98+
}
99+
}
100+
return;
101+
}
102+
}
103+
}
104+
105+
for (Subscription s : subscriptions) {
106+
s.unsubscribe();
107+
}
108+
}
109+
78110
/**
79111
* Removes a {@link Subscription} from this {@code CompositeSubscription}, and unsubscribes the
80112
* {@link Subscription}.
81113
*
82114
* @param s
83-
* the {@link Subscription} to remove
115+
* the {@link Subscription} to remove
84116
*/
85117
public void remove(final Subscription s) {
86118
if (!unsubscribed) {

src/test/java/rx/subscriptions/CompositeSubscriptionTest.java

Lines changed: 128 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertFalse;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
18+
import org.junit.Test;
19+
import rx.Subscription;
20+
import rx.exceptions.CompositeException;
2221

2322
import java.util.ArrayList;
2423
import java.util.List;
2524
import java.util.concurrent.CountDownLatch;
2625
import java.util.concurrent.atomic.AtomicInteger;
2726

28-
import org.junit.Test;
29-
30-
import rx.Subscription;
31-
import rx.exceptions.CompositeException;
27+
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.assertFalse;
29+
import static org.junit.Assert.assertTrue;
30+
import static org.junit.Assert.fail;
3231

3332
public class CompositeSubscriptionTest {
3433

@@ -324,17 +323,18 @@ public void run() {
324323
// we should have only unsubscribed once
325324
assertEquals(1, counter.get());
326325
}
326+
327327
@Test
328328
public void testTryRemoveIfNotIn() {
329329
CompositeSubscription csub = new CompositeSubscription();
330-
330+
331331
CompositeSubscription csub1 = new CompositeSubscription();
332332
CompositeSubscription csub2 = new CompositeSubscription();
333-
333+
334334
csub.add(csub1);
335335
csub.remove(csub1);
336336
csub.add(csub2);
337-
337+
338338
csub.remove(csub1); // try removing again
339339
}
340340

@@ -344,4 +344,121 @@ public void testAddingNullSubscriptionIllegal() {
344344
csub.add(null);
345345
}
346346

347+
@Test
348+
public void testAddAll() {
349+
final AtomicInteger counter = new AtomicInteger();
350+
CompositeSubscription s = new CompositeSubscription();
351+
s.addAll(new Subscription() {
352+
@Override
353+
public void unsubscribe() {
354+
counter.incrementAndGet();
355+
}
356+
357+
@Override
358+
public boolean isUnsubscribed() {
359+
return false;
360+
}
361+
}, new Subscription() {
362+
@Override
363+
public void unsubscribe() {
364+
counter.incrementAndGet();
365+
}
366+
367+
@Override
368+
public boolean isUnsubscribed() {
369+
return false;
370+
}
371+
}, new Subscription() {
372+
@Override
373+
public void unsubscribe() {
374+
counter.incrementAndGet();
375+
}
376+
377+
@Override
378+
public boolean isUnsubscribed() {
379+
return false;
380+
}
381+
}, new Subscription() {
382+
@Override
383+
public void unsubscribe() {
384+
counter.incrementAndGet();
385+
}
386+
387+
@Override
388+
public boolean isUnsubscribed() {
389+
return false;
390+
}
391+
});
392+
393+
s.unsubscribe();
394+
395+
assertEquals(4, counter.get());
396+
}
397+
398+
@Test(timeout = 1000)
399+
public void testAddAllConcurrent() throws InterruptedException {
400+
final AtomicInteger counter = new AtomicInteger();
401+
final CompositeSubscription s = new CompositeSubscription();
402+
403+
final int count = 10;
404+
final CountDownLatch start = new CountDownLatch(1);
405+
final CountDownLatch end = new CountDownLatch(10);
406+
final List<Thread> threads = new ArrayList<Thread>();
407+
for (int i = 0; i < count; i++) {
408+
final Thread t = new Thread() {
409+
@Override
410+
public void run() {
411+
try {
412+
start.await();
413+
s.addAll(new Subscription() {
414+
@Override
415+
public void unsubscribe() {
416+
counter.incrementAndGet();
417+
}
418+
419+
@Override
420+
public boolean isUnsubscribed() {
421+
return false;
422+
}
423+
}, new Subscription() {
424+
@Override
425+
public void unsubscribe() {
426+
counter.incrementAndGet();
427+
}
428+
429+
@Override
430+
public boolean isUnsubscribed() {
431+
return false;
432+
}
433+
}, new Subscription() {
434+
@Override
435+
public void unsubscribe() {
436+
counter.incrementAndGet();
437+
}
438+
439+
@Override
440+
public boolean isUnsubscribed() {
441+
return false;
442+
}
443+
});
444+
end.countDown();
445+
} catch (final InterruptedException e) {
446+
fail(e.getMessage());
447+
}
448+
}
449+
};
450+
t.start();
451+
threads.add(t);
452+
}
453+
454+
start.countDown();
455+
end.await();
456+
s.unsubscribe();
457+
for (final Thread t : threads) {
458+
t.join();
459+
}
460+
461+
assertEquals(30, counter.get());
462+
}
463+
347464
}

0 commit comments

Comments
 (0)