Skip to content

Commit 0099a1a

Browse files
committed
add groupBy overload with evictingMapFactory
1 parent d43c05c commit 0099a1a

File tree

3 files changed

+187
-9
lines changed

3 files changed

+187
-9
lines changed

src/main/java/rx/Observable.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5885,6 +5885,60 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
58855885
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
58865886
}
58875887

5888+
/**
5889+
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
5890+
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
5891+
* {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the
5892+
* source terminates, the next emission by the source having the same key will trigger a new
5893+
* {@code GroupedObservable} emission.
5894+
* <p>
5895+
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
5896+
* <p>
5897+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
5898+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
5899+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
5900+
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
5901+
* <dl>
5902+
* <dt><b>Scheduler:</b></dt>
5903+
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
5904+
* </dl>
5905+
*
5906+
* @param keySelector
5907+
* a function that extracts the key for each item
5908+
* @param elementSelector
5909+
* a function that extracts the return element for each item
5910+
* @param evictingMapFactory
5911+
* a function that given an eviction action returns a {@link Map} instance that will be used to assign
5912+
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
5913+
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
5914+
* This can be used to limit the size of the map by evicting keys by maximum size or access time for
5915+
* instance. If {@code evictingMapFactory} is null then no eviction strategy will be applied (and a suitable default thread-safe
5916+
* implementation of {@code Map} will be supplied). Here's an example using Guava's {@code CacheBuilder} from v19.0:
5917+
* <pre>
5918+
* {@code
5919+
* Func1<Action1<K>, Map<K, Object>> mapFactory
5920+
* = action -> CacheBuilder.newBuilder()
5921+
* .maximumSize(1000)
5922+
* .expireAfterAccess(12, TimeUnit.HOUR)
5923+
* .removalListener(key -> action.call(key))
5924+
* .<K, Object> build().asMap();
5925+
* }
5926+
* </pre>
5927+
*
5928+
* @param <K>
5929+
* the key type
5930+
* @param <R>
5931+
* the element type
5932+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
5933+
* unique key value and each of which emits those items from the source Observable that share that
5934+
* key value
5935+
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
5936+
*/
5937+
@Experimental
5938+
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector, final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
5939+
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector, evictingMapFactory));
5940+
}
5941+
58885942
/**
58895943
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
58905944
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121

2222
import rx.*;
2323
import rx.Observable.*;
24+
import rx.exceptions.Exceptions;
2425
import rx.functions.*;
2526
import rx.internal.producers.ProducerArbiter;
2627
import rx.internal.util.*;
2728
import rx.observables.GroupedObservable;
29+
import rx.observers.Subscribers;
2830
import rx.plugins.RxJavaPlugins;
2931
import rx.subscriptions.Subscriptions;
3032

@@ -46,35 +48,50 @@ public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservabl
4648
final Func1<? super T, ? extends V> valueSelector;
4749
final int bufferSize;
4850
final boolean delayError;
51+
final Func1<Action1<K>, Map<K, Object>> mapFactory; //nullable
4952

5053
@SuppressWarnings({ "unchecked", "rawtypes" })
5154
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector) {
52-
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false);
55+
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false, null);
5356
}
5457

5558
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
56-
this(keySelector, valueSelector, RxRingBuffer.SIZE, false);
59+
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, null);
60+
}
61+
62+
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func1<Action1<K>, Map<K, Object>> mapFactory) {
63+
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, mapFactory);
5764
}
5865

59-
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
66+
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError, Func1<Action1<K>, Map<K, Object>> mapFactory) {
6067
this.keySelector = keySelector;
6168
this.valueSelector = valueSelector;
6269
this.bufferSize = bufferSize;
6370
this.delayError = delayError;
71+
this.mapFactory = mapFactory;
6472
}
6573

6674
@Override
67-
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> t) {
68-
final GroupBySubscriber<T, K, V> parent = new GroupBySubscriber<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError);
75+
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
76+
final GroupBySubscriber<T, K, V> parent;
77+
try {
78+
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
79+
} catch (Throwable ex) {
80+
//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber
81+
Exceptions.throwOrReport(ex, child);
82+
Subscriber<? super T> parent2 = Subscribers.empty();
83+
parent2.unsubscribe();
84+
return parent2;
85+
}
6986

70-
t.add(Subscriptions.create(new Action0() {
87+
child.add(Subscriptions.create(new Action0() {
7188
@Override
7289
public void call() {
7390
parent.cancel();
7491
}
7592
}));
7693

77-
t.setProducer(parent.producer);
94+
child.setProducer(parent.producer);
7895

7996
return parent;
8097
}
@@ -101,6 +118,7 @@ public static final class GroupBySubscriber<T, K, V>
101118
final Map<Object, GroupedUnicast<K, V>> groups;
102119
final Queue<GroupedObservable<K, V>> queue;
103120
final GroupByProducer producer;
121+
final Queue<K> evictedKeys;
104122

105123
static final Object NULL_KEY = new Object();
106124

@@ -129,18 +147,47 @@ public static final class GroupBySubscriber<T, K, V>
129147
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP =
130148
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wip");
131149

132-
public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
150+
public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector,
151+
Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
152+
Func1<Action1<K>, Map<K, Object>> mapFactory) {
133153
this.actual = actual;
134154
this.keySelector = keySelector;
135155
this.valueSelector = valueSelector;
136156
this.bufferSize = bufferSize;
137157
this.delayError = delayError;
138-
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
139158
this.queue = new ConcurrentLinkedQueue<GroupedObservable<K, V>>();
140159
GROUP_COUNT.lazySet(this, 1);
141160
this.s = new ProducerArbiter();
142161
this.s.request(bufferSize);
143162
this.producer = new GroupByProducer(this);
163+
if (mapFactory == null) {
164+
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
165+
this.evictedKeys = null;
166+
} else {
167+
this.evictedKeys = new ConcurrentLinkedQueue<K>();
168+
this.groups = createMap(mapFactory, new EvictionAction<K>(evictedKeys));
169+
}
170+
}
171+
172+
//declare a class instead of using anonymous class to
173+
//limit enclosing scope. Is this of value?
174+
static class EvictionAction<K> implements Action1<K> {
175+
176+
final Queue<K> evictedKeys;
177+
178+
EvictionAction(Queue<K> evictedKeys) {
179+
this.evictedKeys = evictedKeys;
180+
}
181+
182+
@Override
183+
public void call(K key) {
184+
evictedKeys.offer(key);
185+
}
186+
}
187+
188+
@SuppressWarnings("unchecked")
189+
private Map<Object, GroupedUnicast<K, V>> createMap(Func1<Action1<K>, Map<K, Object>> mapFactory, Action1<K> evictionAction) {
190+
return (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.call(evictionAction);
144191
}
145192

146193
@Override
@@ -196,6 +243,16 @@ public void onNext(T t) {
196243
}
197244

198245
group.onNext(v);
246+
247+
if (evictedKeys != null) {
248+
K evictedKey;
249+
while ((evictedKey = evictedKeys.poll()) != null) {
250+
GroupedUnicast<K, V> g = groups.get(evictedKey);
251+
if (g != null) {
252+
g.onComplete();
253+
}
254+
}
255+
}
199256

200257
if (notNew) {
201258
s.request(1);
@@ -224,6 +281,9 @@ public void onCompleted() {
224281
e.onComplete();
225282
}
226283
groups.clear();
284+
if (evictedKeys != null) {
285+
evictedKeys.clear();
286+
}
227287

228288
done = true;
229289
GROUP_COUNT.decrementAndGet(this);
@@ -317,6 +377,9 @@ void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwab
317377
q.clear();
318378
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
319379
groups.clear();
380+
if (evictedKeys != null) {
381+
evictedKeys.clear();
382+
}
320383

321384
for (GroupedUnicast<K, V> e : list) {
322385
e.onError(ex);

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,4 +1802,65 @@ public Integer call(Integer pair) {
18021802
outer.assertValueCount(2);
18031803

18041804
}
1805+
1806+
@Test
1807+
public void mapFactoryEvictionWorks() {
1808+
Func1<Integer, Integer> keySelector = new Func1<Integer, Integer> (){
1809+
@Override
1810+
public Integer call(Integer t) {
1811+
return t /10;
1812+
}};
1813+
Func1<Integer, Integer> elementSelector = UtilityFunctions.identity();
1814+
final List<Integer> evictedKeys = new ArrayList<Integer>();
1815+
//normally we would use Guava CacheBuilder for instance but to save a test dependency
1816+
//we make something custom
1817+
Func1<Action1<Integer>, Map<Integer, Object>> mapFactory = new Func1<Action1<Integer>, Map<Integer, Object>>() {
1818+
@Override
1819+
public Map<Integer, Object> call(final Action1<Integer> evicted) {
1820+
// is a bit risky to override the put method because
1821+
// of possible side-effects (e.g. remove could call put and we did not know it)
1822+
// to fix just need to use composition but needs a verbose implementation of Map
1823+
// interface
1824+
return new ConcurrentHashMap<Integer,Object>() {
1825+
private static final long serialVersionUID = -7519109652858021153L;
1826+
1827+
Integer lastKey = null;
1828+
1829+
@Override
1830+
public Object put(Integer key, Object value) {
1831+
if (this.size() >= 5) {
1832+
super.remove(lastKey);
1833+
evicted.call(lastKey);
1834+
evictedKeys.add(lastKey);
1835+
}
1836+
Object result = super.put(key, value);
1837+
lastKey = key;
1838+
return result;
1839+
}};
1840+
}};
1841+
TestSubscriber<String> ts = TestSubscriber.create();
1842+
Observable
1843+
.range(1, 100)
1844+
.groupBy(keySelector,elementSelector, mapFactory)
1845+
.flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
1846+
@Override
1847+
public Observable<String> call(final GroupedObservable<Integer, Integer> g) {
1848+
return g.map(new Func1<Integer, String>() {
1849+
@Override
1850+
public String call(Integer x) {
1851+
return g.getKey() + ":" + x;
1852+
}
1853+
});
1854+
}
1855+
})
1856+
.subscribe(ts);
1857+
assertEquals(Arrays.asList(4, 5, 6, 7, 8, 9), evictedKeys);
1858+
List<String> expected = Observable.range(1, 100).map(new Func1<Integer, String>() {
1859+
@Override
1860+
public String call(Integer x) {
1861+
return (x /10) + ":" + x;
1862+
}
1863+
}).toList().toBlocking().single();
1864+
assertEquals(expected, ts.getOnNextEvents());
1865+
}
18051866
}

0 commit comments

Comments
 (0)