Skip to content

Commit 29e764a

Browse files
Merge pull request #1966 from zsxwing/fix-groupby-null-key
Fix NPE when the key is null in GroupBy
2 parents f1b9253 + d5b4d6f commit 29e764a

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public Observer<T> getObserver() {
102102

103103
}
104104

105-
private final ConcurrentHashMap<K, GroupState<K, T>> groups = new ConcurrentHashMap<K, GroupState<K, T>>();
105+
private final ConcurrentHashMap<Object, GroupState<K, T>> groups = new ConcurrentHashMap<Object, GroupState<K, T>>();
106106

107107
private static final NotificationLite<Object> nl = NotificationLite.instance();
108108

@@ -166,10 +166,18 @@ void requestFromGroupedObservable(long n, GroupState<K, T> group) {
166166
}
167167
}
168168

169+
private Object groupedKey(K key) {
170+
return key == null ? NULL_KEY : key;
171+
}
172+
173+
private K getKey(Object groupedKey) {
174+
return groupedKey == NULL_KEY ? null : (K) groupedKey;
175+
}
176+
169177
@Override
170178
public void onNext(T t) {
171179
try {
172-
final K key = keySelector.call(t);
180+
final Object key = groupedKey(keySelector.call(t));
173181
GroupState<K, T> group = groups.get(key);
174182
if (group == null) {
175183
// this group doesn't exist
@@ -185,10 +193,10 @@ public void onNext(T t) {
185193
}
186194
}
187195

188-
private GroupState<K, T> createNewGroup(final K key) {
196+
private GroupState<K, T> createNewGroup(final Object key) {
189197
final GroupState<K, T> groupState = new GroupState<K, T>();
190198

191-
GroupedObservable<K, R> go = GroupedObservable.create(key, new OnSubscribe<R>() {
199+
GroupedObservable<K, R> go = GroupedObservable.create(getKey(key), new OnSubscribe<R>() {
192200

193201
@Override
194202
public void call(final Subscriber<? super R> o) {
@@ -252,7 +260,7 @@ public void onNext(T t) {
252260
return groupState;
253261
}
254262

255-
private void cleanupGroup(K key) {
263+
private void cleanupGroup(Object key) {
256264
GroupState<K, T> removed;
257265
removed = groups.remove(key);
258266
if (removed != null) {
@@ -357,4 +365,5 @@ public Object call(Object t) {
357365
}
358366
};
359367

368+
private static final Object NULL_KEY = new Object();
360369
}

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.ArrayList;
2929
import java.util.Arrays;
3030
import java.util.Collection;
31+
import java.util.List;
3132
import java.util.Map;
3233
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -1357,4 +1358,31 @@ public Observable<Integer> call(GroupedObservable<Integer, Integer> t) {
13571358

13581359
};
13591360

1360-
}
1361+
@Test
1362+
public void testGroupByWithNullKey() {
1363+
final String[] key = new String[]{"uninitialized"};
1364+
final List<String> values = new ArrayList<String>();
1365+
Observable.just("a", "b", "c").groupBy(new Func1<String, String>() {
1366+
1367+
@Override
1368+
public String call(String value) {
1369+
return null;
1370+
}
1371+
}).subscribe(new Action1<GroupedObservable<String, String>>() {
1372+
1373+
@Override
1374+
public void call(GroupedObservable<String, String> groupedObservable) {
1375+
key[0] = groupedObservable.getKey();
1376+
groupedObservable.subscribe(new Action1<String>() {
1377+
1378+
@Override
1379+
public void call(String s) {
1380+
values.add(s);
1381+
}
1382+
});
1383+
}
1384+
});
1385+
assertEquals(null, key[0]);
1386+
assertEquals(Arrays.asList("a", "b", "c"), values);
1387+
}
1388+
}

0 commit comments

Comments
 (0)