Skip to content

Commit e3cef6c

Browse files
Cache operator
Cache operator as discussed in ReactiveX/RxJava#209 Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
1 parent 634cea3 commit e3cef6c

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import rx.observables.ConnectableObservable;
4040
import rx.observables.GroupedObservable;
4141
import rx.operators.OperationAll;
42+
import rx.operators.OperationCache;
4243
import rx.operators.OperationConcat;
4344
import rx.operators.OperationDefer;
4445
import rx.operators.OperationDematerialize;
@@ -1678,6 +1679,22 @@ public static <T> ConnectableObservable<T> replay(final Observable<T> that) {
16781679
return OperationMulticast.multicast(that, ReplaySubject.<T> create());
16791680
}
16801681

1682+
/**
1683+
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
1684+
* <p>
1685+
* This is useful when returning an Observable that you wish to cache responses but can't control the
1686+
* subscribe/unsubscribe behavior of all the Observers.
1687+
* <p>
1688+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
1689+
* use this on infinite or very large sequences that will use up memory. This is similar to
1690+
* the {@link Observable#toList()} operator in this caution.
1691+
*
1692+
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
1693+
*/
1694+
public static <T> Observable<T> cache(final Observable<T> that) {
1695+
return create(OperationCache.cache(that));
1696+
}
1697+
16811698
/**
16821699
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
16831700
*
@@ -3260,6 +3277,22 @@ public ConnectableObservable<T> replay() {
32603277
return replay(this);
32613278
}
32623279

3280+
/**
3281+
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
3282+
* <p>
3283+
* This is useful when returning an Observable that you wish to cache responses but can't control the
3284+
* subscribe/unsubscribe behavior of all the Observers.
3285+
* <p>
3286+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
3287+
* use this on infinite or very large sequences that will use up memory. This is similar to
3288+
* the {@link Observable#toList()} operator in this caution.
3289+
*
3290+
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
3291+
*/
3292+
public Observable<T> cache() {
3293+
return cache(this);
3294+
}
3295+
32633296
/**
32643297
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
32653298
*
@@ -4371,6 +4404,59 @@ public void call(String v) {
43714404
}
43724405
}
43734406

4407+
@Test
4408+
public void testCache() throws InterruptedException {
4409+
final AtomicInteger counter = new AtomicInteger();
4410+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
4411+
4412+
@Override
4413+
public Subscription call(final Observer<String> observer) {
4414+
final BooleanSubscription subscription = new BooleanSubscription();
4415+
new Thread(new Runnable() {
4416+
4417+
@Override
4418+
public void run() {
4419+
System.out.println("published observable being executed");
4420+
observer.onNext("one");
4421+
observer.onCompleted();
4422+
counter.incrementAndGet();
4423+
}
4424+
}).start();
4425+
return subscription;
4426+
}
4427+
}).cache();
4428+
4429+
// we then expect the following 2 subscriptions to get that same value
4430+
final CountDownLatch latch = new CountDownLatch(2);
4431+
4432+
// subscribe once
4433+
o.subscribe(new Action1<String>() {
4434+
4435+
@Override
4436+
public void call(String v) {
4437+
assertEquals("one", v);
4438+
System.out.println("v: " + v);
4439+
latch.countDown();
4440+
}
4441+
});
4442+
4443+
// subscribe again
4444+
o.subscribe(new Action1<String>() {
4445+
4446+
@Override
4447+
public void call(String v) {
4448+
assertEquals("one", v);
4449+
System.out.println("v: " + v);
4450+
latch.countDown();
4451+
}
4452+
});
4453+
4454+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
4455+
fail("subscriptions did not receive values");
4456+
}
4457+
assertEquals(1, counter.get());
4458+
}
4459+
43744460
private static class TestException extends RuntimeException {
43754461
private static final long serialVersionUID = 1L;
43764462
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.Test;
26+
27+
import rx.Observable;
28+
import rx.Observer;
29+
import rx.Subscription;
30+
import rx.subjects.ReplaySubject;
31+
import rx.subscriptions.BooleanSubscription;
32+
import rx.util.functions.Action1;
33+
import rx.util.functions.Func1;
34+
35+
/**
36+
* Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence.
37+
* <p>
38+
* This is useful when returning an Observable that you wish to cache responses but can't control the
39+
* subscribe/unsubscribe behavior of all the Observers.
40+
* <p>
41+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
42+
* use this on infinite or very large sequences that will use up memory. This is similar to
43+
* the {@link Observable#toList()} operator in this caution.
44+
*
45+
*/
46+
public class OperationCache {
47+
48+
public static <T> Func1<Observer<T>, Subscription> cache(final Observable<T> source) {
49+
return new Func1<Observer<T>, Subscription>() {
50+
51+
final AtomicBoolean subscribed = new AtomicBoolean(false);
52+
private final ReplaySubject<T> cache = ReplaySubject.create();
53+
54+
@Override
55+
public Subscription call(Observer<T> observer) {
56+
if (subscribed.compareAndSet(false, true)) {
57+
// subscribe to the source once
58+
source.subscribe(cache);
59+
/*
60+
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
61+
*
62+
* This means this should never be used on an infinite or very large sequence, similar to toList().
63+
*/
64+
}
65+
66+
return cache.subscribe(observer);
67+
}
68+
69+
};
70+
}
71+
72+
public static class UnitTest {
73+
74+
@Test
75+
public void testCache() throws InterruptedException {
76+
final AtomicInteger counter = new AtomicInteger();
77+
Observable<String> o = Observable.create(cache(Observable.create(new Func1<Observer<String>, Subscription>() {
78+
79+
@Override
80+
public Subscription call(final Observer<String> observer) {
81+
final BooleanSubscription subscription = new BooleanSubscription();
82+
new Thread(new Runnable() {
83+
84+
@Override
85+
public void run() {
86+
System.out.println("published observable being executed");
87+
observer.onNext("one");
88+
observer.onCompleted();
89+
counter.incrementAndGet();
90+
}
91+
}).start();
92+
return subscription;
93+
}
94+
})));
95+
96+
// we then expect the following 2 subscriptions to get that same value
97+
final CountDownLatch latch = new CountDownLatch(2);
98+
99+
// subscribe once
100+
o.subscribe(new Action1<String>() {
101+
102+
@Override
103+
public void call(String v) {
104+
assertEquals("one", v);
105+
System.out.println("v: " + v);
106+
latch.countDown();
107+
}
108+
});
109+
110+
// subscribe again
111+
o.subscribe(new Action1<String>() {
112+
113+
@Override
114+
public void call(String v) {
115+
assertEquals("one", v);
116+
System.out.println("v: " + v);
117+
latch.countDown();
118+
}
119+
});
120+
121+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
122+
fail("subscriptions did not receive values");
123+
}
124+
assertEquals(1, counter.get());
125+
}
126+
}
127+
128+
}

0 commit comments

Comments
 (0)