Skip to content

Commit 169f8f3

Browse files
committed
Merge branch 'master' into multisubscribe
Conflicts: rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
1 parent 2151f90 commit 169f8f3

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,37 +56,43 @@ public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> grou
5656

5757
private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K, V>>, Subscription> {
5858
private final Observable<KeyValue<K, V>> source;
59-
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();
6059

6160
private GroupBy(Observable<KeyValue<K, V>> source) {
6261
this.source = source;
6362
}
6463

6564
@Override
6665
public Subscription call(final Observer<GroupedObservable<K, V>> observer) {
66+
return source.subscribe(new GroupByObserver(observer));
67+
}
6768

68-
return source.subscribe(new Observer<KeyValue<K, V>>() {
69+
private class GroupByObserver implements Observer<KeyValue<K, V>> {
70+
private final Observer<GroupedObservable<K, V>> underlying;
6971

70-
@Override
71-
public void onCompleted() {
72-
observer.onCompleted();
73-
}
72+
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();
7473

75-
@Override
76-
public void onError(Exception e) {
77-
observer.onError(e);
78-
}
74+
private GroupByObserver(Observer<GroupedObservable<K, V>> underlying) {
75+
this.underlying = underlying;
76+
}
7977

80-
@Override
81-
public void onNext(final KeyValue<K, V> args) {
82-
K key = args.key;
83-
boolean newGroup = keys.putIfAbsent(key, true) == null;
84-
if (newGroup) {
85-
observer.onNext(buildObservableFor(source, key));
86-
}
87-
}
78+
@Override
79+
public void onCompleted() {
80+
underlying.onCompleted();
81+
}
82+
83+
@Override
84+
public void onError(Exception e) {
85+
underlying.onError(e);
86+
}
8887

89-
});
88+
@Override
89+
public void onNext(final KeyValue<K, V> args) {
90+
K key = args.key;
91+
boolean newGroup = keys.putIfAbsent(key, true) == null;
92+
if (newGroup) {
93+
underlying.onNext(buildObservableFor(source, key));
94+
}
95+
}
9096
}
9197
}
9298

0 commit comments

Comments
 (0)