@@ -56,37 +56,43 @@ public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> grou
56
56
57
57
private static class GroupBy <K , V > implements Func1 <Observer <GroupedObservable <K , V >>, Subscription > {
58
58
private final Observable <KeyValue <K , V >> source ;
59
- private final ConcurrentHashMap <K , Boolean > keys = new ConcurrentHashMap <K , Boolean >();
60
59
61
60
private GroupBy (Observable <KeyValue <K , V >> source ) {
62
61
this .source = source ;
63
62
}
64
63
65
64
@ Override
66
65
public Subscription call (final Observer <GroupedObservable <K , V >> observer ) {
66
+ return source .subscribe (new GroupByObserver (observer ));
67
+ }
67
68
68
- return source .subscribe (new Observer <KeyValue <K , V >>() {
69
+ private class GroupByObserver implements Observer <KeyValue <K , V >> {
70
+ private final Observer <GroupedObservable <K , V >> underlying ;
69
71
70
- @ Override
71
- public void onCompleted () {
72
- observer .onCompleted ();
73
- }
72
+ private final ConcurrentHashMap <K , Boolean > keys = new ConcurrentHashMap <K , Boolean >();
74
73
75
- @ Override
76
- public void onError (Exception e ) {
77
- observer .onError (e );
78
- }
74
+ private GroupByObserver (Observer <GroupedObservable <K , V >> underlying ) {
75
+ this .underlying = underlying ;
76
+ }
79
77
80
- @ Override
81
- public void onNext (final KeyValue <K , V > args ) {
82
- K key = args .key ;
83
- boolean newGroup = keys .putIfAbsent (key , true ) == null ;
84
- if (newGroup ) {
85
- observer .onNext (buildObservableFor (source , key ));
86
- }
87
- }
78
+ @ Override
79
+ public void onCompleted () {
80
+ underlying .onCompleted ();
81
+ }
82
+
83
+ @ Override
84
+ public void onError (Exception e ) {
85
+ underlying .onError (e );
86
+ }
88
87
89
- });
88
+ @ Override
89
+ public void onNext (final KeyValue <K , V > args ) {
90
+ K key = args .key ;
91
+ boolean newGroup = keys .putIfAbsent (key , true ) == null ;
92
+ if (newGroup ) {
93
+ underlying .onNext (buildObservableFor (source , key ));
94
+ }
95
+ }
90
96
}
91
97
}
92
98
0 commit comments