Skip to content

Manual Merge of Pull Request #407 #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.AsyncSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -3657,6 +3658,14 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription that contains the last notification only.
* @return a {@link ConnectableObservable}
*/
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Synonymous with <code>reduce()</code>.
* <p>
Expand Down Expand Up @@ -4414,7 +4423,7 @@ public Boolean call(T t) {
* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
*
*
* @param o
* @return {@code true} if the given function is an internal implementation, and {@code false} otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.OperationRefCount;
import rx.util.functions.Func1;

/**
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -46,4 +48,12 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
*/
public abstract Subscription connect();

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @return a {@link Observable}
*/
public Observable<T> refCount() {
return Observable.create(OperationRefCount.refCount(this));
}
}
66 changes: 66 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationRefCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
*/
public final class OperationRefCount<T> {
public static <T> Observable.OnSubscribeFunc<T> refCount(ConnectableObservable<T> connectableObservable) {
return new RefCount<T>(connectableObservable);
}

private static class RefCount<T> implements Observable.OnSubscribeFunc<T> {
private final ConnectableObservable<T> innerConnectableObservable;
private final Object gate = new Object();
private int count = 0;
private Subscription connection = null;

public RefCount(ConnectableObservable<T> innerConnectableObservable) {
this.innerConnectableObservable = innerConnectableObservable;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
final Subscription subscription = innerConnectableObservable.subscribe(observer);
synchronized (gate) {
if (count++ == 0) {
connection = innerConnectableObservable.connect();
}
}
return Subscriptions.create(new Action0() {
@Override
public void call() {
synchronized (gate) {
if (--count == 0) {
connection.unsubscribe();
connection = null;
}
}
subscription.unsubscribe();
}
});
}
}
}
42 changes: 42 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,48 @@ public void call(String v) {
}
}

@Test
public void testPublishLast() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
ConnectableObservable<String> connectable = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
count.incrementAndGet();
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
observer.onNext("first");
observer.onNext("last");
observer.onCompleted();
}
}).start();
return subscription;
}
}).publishLast();

// subscribe once
final CountDownLatch latch = new CountDownLatch(1);
connectable.subscribe(new Action1<String>() {
@Override
public void call(String value) {
assertEquals("last", value);
latch.countDown();
}
});

// subscribe twice
connectable.subscribe(new Action1<String>() {
@Override
public void call(String _) {}
});

Subscription subscription = connectable.connect();
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(1, count.get());
subscription.unsubscribe();
}

@Test
public void testReplay() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Expand Down
103 changes: 103 additions & 0 deletions rxjava-core/src/test/java/rx/RefCountTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package rx;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.concurrency.TestScheduler;
import rx.util.functions.Action1;

public class RefCountTest {

@Test
public void testRefCount() {
TestScheduler s = new TestScheduler();
Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount();

// subscribe list1
final List<Long> list1 = new ArrayList<Long>();
Subscription s1 = interval.subscribe(new Action1<Long>() {

@Override
public void call(Long t1) {
list1.add(t1);
}

});
s.advanceTimeBy(200, TimeUnit.MILLISECONDS);

assertEquals(2, list1.size());
assertEquals(0L, list1.get(0).longValue());
assertEquals(1L, list1.get(1).longValue());

// subscribe list2
final List<Long> list2 = new ArrayList<Long>();
Subscription s2 = interval.subscribe(new Action1<Long>() {

@Override
public void call(Long t1) {
list2.add(t1);
}

});
s.advanceTimeBy(300, TimeUnit.MILLISECONDS);

// list 1 should have 5 items
assertEquals(5, list1.size());
assertEquals(2L, list1.get(2).longValue());
assertEquals(3L, list1.get(3).longValue());
assertEquals(4L, list1.get(4).longValue());

// list 2 should only have 3 items
assertEquals(3, list2.size());
assertEquals(2L, list2.get(0).longValue());
assertEquals(3L, list2.get(1).longValue());
assertEquals(4L, list2.get(2).longValue());

// unsubscribe list1
s1.unsubscribe();

// advance further
s.advanceTimeBy(300, TimeUnit.MILLISECONDS);

// list 1 should still have 5 items
assertEquals(5, list1.size());

// list 2 should have 6 items
assertEquals(6, list2.size());
assertEquals(5L, list2.get(3).longValue());
assertEquals(6L, list2.get(4).longValue());
assertEquals(7L, list2.get(5).longValue());

// unsubscribe list2
s2.unsubscribe();

// advance further
s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);

// the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request


// // subscribing a new one should start over because the source should have been unsubscribed
// // subscribe list1
// final List<Long> list3 = new ArrayList<Long>();
// Subscription s3 = interval.subscribe(new Action1<Long>() {
//
// @Override
// public void call(Long t1) {
// list3.add(t1);
// }
//
// });
// s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
//
// assertEquals(2, list3.size());
// assertEquals(0L, list3.get(0).longValue());
// assertEquals(1L, list3.get(1).longValue());

}
}
48 changes: 48 additions & 0 deletions rxjava-core/src/test/java/rx/RefCountTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rx;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

public class RefCountTests {

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void onlyFirstShouldSubscribeAndLastUnsubscribe() {
final AtomicInteger subscriptionCount = new AtomicInteger();
final AtomicInteger unsubscriptionCount = new AtomicInteger();
Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
subscriptionCount.incrementAndGet();
return Subscriptions.create(new Action0() {
@Override
public void call() {
unsubscriptionCount.incrementAndGet();
}
});
}
});
Observable<Integer> refCounted = observable.publish().refCount();
Observer<Integer> observer = mock(Observer.class);
Subscription first = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
Subscription second = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
first.unsubscribe();
assertEquals(0, unsubscriptionCount.get());
second.unsubscribe();
assertEquals(1, unsubscriptionCount.get());
}
}