Skip to content

Commit 2e5b51c

Browse files
author
Klemen Kresnik
committed
Add vararg of Subscriptions to composite subscription.
1 parent 62383bd commit 2e5b51c

File tree

2 files changed

+237
-86
lines changed

2 files changed

+237
-86
lines changed

src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,46 +15,46 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import rx.Subscription;
19+
import rx.exceptions.Exceptions;
20+
1821
import java.util.ArrayList;
1922
import java.util.Arrays;
2023
import java.util.Collection;
2124
import java.util.HashSet;
2225
import java.util.List;
2326
import java.util.Set;
2427

25-
import rx.Subscription;
26-
import rx.exceptions.*;
27-
2828
/**
2929
* Subscription that represents a group of Subscriptions that are unsubscribed together.
3030
* <p>
3131
* All methods of this class are thread-safe.
3232
*/
3333
public final class CompositeSubscription implements Subscription {
34-
34+
3535
private Set<Subscription> subscriptions;
3636
private volatile boolean unsubscribed;
37-
37+
3838
public CompositeSubscription() {
3939
}
40-
40+
4141
public CompositeSubscription(final Subscription... subscriptions) {
4242
this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
4343
}
44-
44+
4545
@Override
4646
public boolean isUnsubscribed() {
4747
return unsubscribed;
4848
}
49-
49+
5050
/**
5151
* Adds a new {@link Subscription} to this {@code CompositeSubscription} if the
5252
* {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
5353
* unsubscribed, {@code add} will indicate this by explicitly unsubscribing the new {@code Subscription} as
5454
* well.
5555
*
5656
* @param s
57-
* the {@link Subscription} to add
57+
* the {@link Subscription} to add
5858
*/
5959
public void add(final Subscription s) {
6060
if (s.isUnsubscribed()) {
@@ -74,13 +74,47 @@ public void add(final Subscription s) {
7474
// call after leaving the synchronized block so we're not holding a lock while executing this
7575
s.unsubscribe();
7676
}
77-
77+
78+
/**
79+
* Adds collection of {@link Subscription} to this {@code CompositeSubscription} if the
80+
* {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
81+
* unsubscribed, {@code addAll} will indicate this by explicitly unsubscribing all {@code Subscription} in collection as
82+
* well.
83+
*
84+
* @param subscriptions
85+
* the collection of {@link Subscription} to add
86+
*/
87+
public void addAll(final Subscription... subscriptions) {
88+
List<Subscription> subscriptionList = new ArrayList<Subscription>(4);
89+
for (Subscription s : subscriptions) {
90+
if (!s.isUnsubscribed()) {
91+
subscriptionList.add(s);
92+
}
93+
}
94+
95+
if (!unsubscribed) {
96+
synchronized (this) {
97+
if (!unsubscribed) {
98+
if (this.subscriptions == null) {
99+
this.subscriptions = new HashSet<Subscription>(4);
100+
}
101+
this.subscriptions.addAll(subscriptionList);
102+
return;
103+
}
104+
}
105+
}
106+
107+
for (Subscription s : subscriptionList) {
108+
s.unsubscribe();
109+
}
110+
}
111+
78112
/**
79113
* Removes a {@link Subscription} from this {@code CompositeSubscription}, and unsubscribes the
80114
* {@link Subscription}.
81115
*
82116
* @param s
83-
* the {@link Subscription} to remove
117+
* the {@link Subscription} to remove
84118
*/
85119
public void remove(final Subscription s) {
86120
if (!unsubscribed) {
@@ -97,7 +131,7 @@ public void remove(final Subscription s) {
97131
}
98132
}
99133
}
100-
134+
101135
/**
102136
* Unsubscribes any subscriptions that are currently part of this {@code CompositeSubscription} and remove
103137
* them from the {@code CompositeSubscription} so that the {@code CompositeSubscription} is empty and
@@ -117,7 +151,7 @@ public void clear() {
117151
unsubscribeFromAll(unsubscribe);
118152
}
119153
}
120-
154+
121155
/**
122156
* Unsubscribes itself and all inner subscriptions.
123157
* <p>After call of this method, new {@code Subscription}s added to {@link CompositeSubscription}
@@ -139,7 +173,7 @@ public void unsubscribe() {
139173
unsubscribeFromAll(unsubscribe);
140174
}
141175
}
142-
176+
143177
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
144178
if (subscriptions == null) {
145179
return;
@@ -157,7 +191,7 @@ private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
157191
}
158192
Exceptions.throwIfAny(es);
159193
}
160-
194+
161195
/**
162196
* Returns true if this composite is not unsubscribed and contains subscriptions.
163197
*

0 commit comments

Comments
 (0)