Skip to content

Commit 7beceef

Browse files
committed
add tests
1 parent 94f6cc4 commit 7beceef

File tree

2 files changed

+98
-16
lines changed

2 files changed

+98
-16
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6391,8 +6391,8 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
63916391
* Func1<Action1<K>, Map<K, Object>> mapFactory
63926392
* = action -> CacheBuilder.newBuilder()
63936393
* .maximumSize(1000)
6394-
* .expireAfterAccess(12, TimeUnit.HOUR)
6395-
* .removalListener(key -> action.call(key))
6394+
* .expireAfterAccess(12, TimeUnit.HOURS)
6395+
* .removalListener(notification -> action.call(notification.getKey()))
63966396
* .<K, Object> build().asMap();
63976397
* }
63986398
* </pre>

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 96 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,16 @@
2626
import org.junit.*;
2727
import org.mockito.*;
2828

29+
import com.google.common.cache.CacheBuilder;
30+
import com.google.common.cache.RemovalListener;
31+
import com.google.common.cache.RemovalNotification;
32+
2933
import rx.*;
3034
import rx.Observable;
3135
import rx.Observable.OnSubscribe;
3236
import rx.Observer;
3337
import rx.exceptions.Exceptions;
38+
import rx.exceptions.OnErrorNotImplementedException;
3439
import rx.exceptions.TestException;
3540
import rx.functions.*;
3641
import rx.internal.util.*;
@@ -1858,29 +1863,62 @@ public String call(Integer x) {
18581863
})
18591864
.subscribe(ts);
18601865
assertEquals(Arrays.asList(4, 5, 6, 7, 8, 9), evictedKeys);
1861-
List<String> expected = Observable.range(1, 100).map(new Func1<Integer, String>() {
1862-
@Override
1863-
public String call(Integer x) {
1864-
return (x /10) + ":" + x;
1865-
}
1866-
}).toList().toBlocking().single();
1866+
List<String> expected = Observable
1867+
.range(1, 100)
1868+
.map(new Func1<Integer, String>() {
1869+
@Override
1870+
public String call(Integer x) {
1871+
return (x /10) + ":" + x;
1872+
}
1873+
})
1874+
.toList().toBlocking().single();
18671875
assertEquals(expected, ts.getOnNextEvents());
18681876
}
18691877

1878+
1879+
Func1<Integer, Integer> elementSelector = UtilityFunctions.identity();
18701880
private static final Func1<Integer, Integer> EVICTING_MAP_KEY_SELECTOR = new Func1<Integer, Integer> (){
18711881
@Override
18721882
public Integer call(Integer t) {
18731883
return t /10;
18741884
}};
18751885

18761886
@Test
1877-
public void testEvictingMapFactoryIfMapPutThrowsThen() {
1887+
public void testEvictingMapFactoryIfMapPutThrowsRuntimeExceptionThenErrorEmittedByStream() {
18781888
Func1<Integer, Integer> elementSelector = UtilityFunctions.identity();
18791889
Exceptions.throwIfFatal(null);
18801890
final RuntimeException exception = new RuntimeException("boo");
18811891
//normally we would use Guava CacheBuilder for instance but to save a test dependency
18821892
//we make something custom
1883-
Func1<Action1<Integer>, Map<Integer, Object>> mapFactory = new Func1<Action1<Integer>, Map<Integer, Object>>() {
1893+
Func1<Action1<Integer>, Map<Integer, Object>> mapFactory = createMapFactoryThatThrowsOnPut(exception);
1894+
TestSubscriber<Object> ts = TestSubscriber.create();
1895+
Observable
1896+
.range(1, 100)
1897+
.groupBy(EVICTING_MAP_KEY_SELECTOR, elementSelector, mapFactory)
1898+
.flatMap(UtilityFunctions.<Observable<Integer>>identity())
1899+
.subscribe(ts);
1900+
ts.assertError(exception);
1901+
}
1902+
1903+
@Test(expected = OnErrorNotImplementedException.class)
1904+
public void testEvictingMapFactoryIfMapPutThrowsOnErrorNotImplementedExceptionThenErrorEmittedByStream() {
1905+
1906+
Exceptions.throwIfFatal(null);
1907+
final RuntimeException exception = new OnErrorNotImplementedException("boo", new RuntimeException());
1908+
//normally we would use Guava CacheBuilder for instance but to save a test dependency
1909+
//we make something custom
1910+
Func1<Action1<Integer>, Map<Integer, Object>> mapFactory = createMapFactoryThatThrowsOnPut(exception);
1911+
TestSubscriber<Object> ts = TestSubscriber.create();
1912+
Observable
1913+
.range(1, 100)
1914+
.groupBy(EVICTING_MAP_KEY_SELECTOR, elementSelector, mapFactory)
1915+
.flatMap(UtilityFunctions.<Observable<Integer>>identity())
1916+
.subscribe(ts);
1917+
}
1918+
1919+
private Func1<Action1<Integer>, Map<Integer, Object>> createMapFactoryThatThrowsOnPut(
1920+
final RuntimeException exception) {
1921+
return new Func1<Action1<Integer>, Map<Integer, Object>>() {
18841922
@SuppressWarnings("serial")
18851923
@Override
18861924
public Map<Integer, Object> call(final Action1<Integer> evicted) {
@@ -1895,17 +1933,61 @@ public Object put(Integer key, Object value) {
18951933
throw exception;
18961934
}};
18971935
}};
1898-
TestSubscriber<Object> ts = TestSubscriber.create();
1936+
}
1937+
1938+
@Test
1939+
public void mapFactoryEvictionWorksWithGuavaCache() {
1940+
final List<Integer> evictedKeys = new ArrayList<Integer>();
1941+
Func1<Action1<Integer>, Map<Integer, Object>> mapFactory =
1942+
new Func1<Action1<Integer>, Map<Integer, Object>>() {
1943+
@Override
1944+
public Map<Integer, Object> call(final Action1<Integer> action) {
1945+
return CacheBuilder.newBuilder()
1946+
.maximumSize(5)
1947+
.removalListener(new RemovalListener<Integer, Object>() {
1948+
@Override
1949+
public void onRemoval(RemovalNotification<Integer, Object> notification) {
1950+
action.call(notification.getKey());
1951+
evictedKeys.add(notification.getKey());
1952+
}
1953+
})
1954+
.build().asMap();
1955+
}
1956+
};
1957+
Func1<Integer, Integer> elementSelector = UtilityFunctions.identity();
1958+
TestSubscriber<String> ts = TestSubscriber.create();
18991959
Observable
19001960
.range(1, 100)
19011961
.groupBy(EVICTING_MAP_KEY_SELECTOR, elementSelector, mapFactory)
1902-
.flatMap(UtilityFunctions.<Observable<Integer>>identity())
1962+
.flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
1963+
@Override
1964+
public Observable<String> call(final GroupedObservable<Integer, Integer> g) {
1965+
return g.map(new Func1<Integer, String>() {
1966+
@Override
1967+
public String call(Integer x) {
1968+
return g.getKey() + ":" + x;
1969+
}
1970+
});
1971+
}
1972+
})
19031973
.subscribe(ts);
1904-
ts.assertError(exception);
1974+
assertEquals(Arrays.asList(0, 1, 2, 3, 4), evictedKeys.subList(0, 5));
1975+
List<String> expected = Observable
1976+
.range(1, 100)
1977+
.map(new Func1<Integer, String>() {
1978+
@Override
1979+
public String call(Integer x) {
1980+
return (x /10) + ":" + x;
1981+
}
1982+
})
1983+
.toList().toBlocking().single();
1984+
assertEquals(expected, ts.getOnNextEvents());
19051985
}
19061986

1907-
@Test
1908-
public void mapFactoryEvictionWorksWithGuavaCache() {
1909-
1987+
@Test(expected = NullPointerException.class)
1988+
public void testGroupByThrowsNpeIfEvictingMapFactoryNull() {
1989+
Observable
1990+
.range(1, 100)
1991+
.groupBy(EVICTING_MAP_KEY_SELECTOR, elementSelector, null);
19101992
}
19111993
}

0 commit comments

Comments
 (0)