Skip to content

Commit 791379d

Browse files
davidmotenakarnokd
authored andcommitted
save allocation in OnSubscribeAutoConnect (#4233)
1 parent 99abbf9 commit 791379d

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
*
3030
* @param <T> the value type of the chain
3131
*/
32-
public final class OnSubscribeAutoConnect<T> implements OnSubscribe<T> {
33-
final ConnectableObservable<? extends T> source;
32+
@SuppressWarnings("serial")
33+
public final class OnSubscribeAutoConnect<T> extends AtomicInteger implements OnSubscribe<T> {
34+
// AtomicInteger aspect of `this` represents the number of clients
35+
36+
final ConnectableObservable<? extends T> source;
3437
final int numberOfSubscribers;
3538
final Action1<? super Subscription> connection;
36-
final AtomicInteger clients;
3739

3840
public OnSubscribeAutoConnect(ConnectableObservable<? extends T> source,
3941
int numberOfSubscribers,
@@ -44,12 +46,12 @@ public OnSubscribeAutoConnect(ConnectableObservable<? extends T> source,
4446
this.source = source;
4547
this.numberOfSubscribers = numberOfSubscribers;
4648
this.connection = connection;
47-
this.clients = new AtomicInteger();
4849
}
4950
@Override
5051
public void call(Subscriber<? super T> child) {
5152
source.unsafeSubscribe(Subscribers.wrap(child));
52-
if (clients.incrementAndGet() == numberOfSubscribers) {
53+
//this.get() represents the number of clients
54+
if (this.incrementAndGet() == numberOfSubscribers) {
5355
source.connect(connection);
5456
}
5557
}

0 commit comments

Comments
 (0)