Skip to content

Commit c3a3d1a

Browse files
davidmotenakarnokd
authored andcommitted
reduce allocations in OnSubscribeGroupJoin (#4237)
1 parent 89f2d6f commit c3a3d1a

File tree

1 file changed

+36
-29
lines changed

1 file changed

+36
-29
lines changed

src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,27 @@ public void call(Subscriber<? super R> child) {
6565
}
6666

6767
/** Manages sub-observers and subscriptions. */
68-
final class ResultManager implements Subscription {
68+
final class ResultManager extends HashMap<Integer, Observer<T2>>implements Subscription {
69+
// HashMap aspect of `this` refers to `leftMap`
70+
71+
private static final long serialVersionUID = -3035156013812425335L;
72+
6973
final RefCountSubscription cancel;
7074
final Subscriber<? super R> subscriber;
7175
final CompositeSubscription group;
72-
final Object guard = new Object();
73-
/** Guarded by guard. */
76+
/** Guarded by this. */
7477
int leftIds;
75-
/** Guarded by guard. */
78+
/** Guarded by this. */
7679
int rightIds;
77-
/** Guarded by guard. */
78-
final Map<Integer, Observer<T2>> leftMap = new HashMap<Integer, Observer<T2>>(); // NOPMD
79-
/** Guarded by guard. */
80+
/** Guarded by this. */
8081
final Map<Integer, T2> rightMap = new HashMap<Integer, T2>(); // NOPMD
81-
/** Guarded by guard. */
82+
/** Guarded by this. */
8283
boolean leftDone;
83-
/** Guarded by guard. */
84+
/** Guarded by this. */
8485
boolean rightDone;
8586

8687
public ResultManager(Subscriber<? super R> subscriber) {
88+
super();
8789
this.subscriber = subscriber;
8890
this.group = new CompositeSubscription();
8991
this.cancel = new RefCountSubscription(group);
@@ -110,15 +112,20 @@ public void unsubscribe() {
110112
public boolean isUnsubscribed() {
111113
return cancel.isUnsubscribed();
112114
}
115+
116+
Map<Integer, Observer<T2>> leftMap() {
117+
return this;
118+
}
119+
113120
/**
114121
* Notify everyone and cleanup.
115122
* @param e the exception
116123
*/
117124
void errorAll(Throwable e) {
118125
List<Observer<T2>> list;
119-
synchronized (guard) {
120-
list = new ArrayList<Observer<T2>>(leftMap.values());
121-
leftMap.clear();
126+
synchronized (ResultManager.this) {
127+
list = new ArrayList<Observer<T2>>(leftMap().values());
128+
leftMap().clear();
122129
rightMap.clear();
123130
}
124131
for (Observer<T2> o : list) {
@@ -132,8 +139,8 @@ void errorAll(Throwable e) {
132139
* @param e the exception
133140
*/
134141
void errorMain(Throwable e) {
135-
synchronized (guard) {
136-
leftMap.clear();
142+
synchronized (ResultManager.this) {
143+
leftMap().clear();
137144
rightMap.clear();
138145
}
139146
subscriber.onError(e);
@@ -158,9 +165,9 @@ public void onNext(T1 args) {
158165
Subject<T2, T2> subj = PublishSubject.create();
159166
Observer<T2> subjSerial = new SerializedObserver<T2>(subj);
160167

161-
synchronized (guard) {
168+
synchronized (ResultManager.this) {
162169
id = leftIds++;
163-
leftMap.put(id, subjSerial);
170+
leftMap().put(id, subjSerial);
164171
}
165172

166173
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
@@ -174,7 +181,7 @@ public void onNext(T1 args) {
174181
R result = resultSelector.call(args, window);
175182

176183
List<T2> rightMapValues;
177-
synchronized (guard) {
184+
synchronized (ResultManager.this) {
178185
rightMapValues = new ArrayList<T2>(rightMap.values());
179186
}
180187

@@ -192,11 +199,11 @@ public void onNext(T1 args) {
192199
@Override
193200
public void onCompleted() {
194201
List<Observer<T2>> list = null;
195-
synchronized (guard) {
202+
synchronized (ResultManager.this) {
196203
leftDone = true;
197204
if (rightDone) {
198-
list = new ArrayList<Observer<T2>>(leftMap.values());
199-
leftMap.clear();
205+
list = new ArrayList<Observer<T2>>(leftMap().values());
206+
leftMap().clear();
200207
rightMap.clear();
201208
}
202209
}
@@ -216,7 +223,7 @@ final class RightObserver extends Subscriber<T2> {
216223
public void onNext(T2 args) {
217224
try {
218225
int id;
219-
synchronized (guard) {
226+
synchronized (ResultManager.this) {
220227
id = rightIds++;
221228
rightMap.put(id, args);
222229
}
@@ -228,8 +235,8 @@ public void onNext(T2 args) {
228235
duration.unsafeSubscribe(d2);
229236

230237
List<Observer<T2>> list;
231-
synchronized (guard) {
232-
list = new ArrayList<Observer<T2>>(leftMap.values());
238+
synchronized (ResultManager.this) {
239+
list = new ArrayList<Observer<T2>>(leftMap().values());
233240
}
234241
for (Observer<T2> o : list) {
235242
o.onNext(args);
@@ -242,11 +249,11 @@ public void onNext(T2 args) {
242249
@Override
243250
public void onCompleted() {
244251
List<Observer<T2>> list = null;
245-
synchronized (guard) {
252+
synchronized (ResultManager.this) {
246253
rightDone = true;
247254
if (leftDone) {
248-
list = new ArrayList<Observer<T2>>(leftMap.values());
249-
leftMap.clear();
255+
list = new ArrayList<Observer<T2>>(leftMap().values());
256+
leftMap().clear();
250257
rightMap.clear();
251258
}
252259
}
@@ -273,8 +280,8 @@ public void onCompleted() {
273280
if (once) {
274281
once = false;
275282
Observer<T2> gr;
276-
synchronized (guard) {
277-
gr = leftMap.remove(id);
283+
synchronized (ResultManager.this) {
284+
gr = leftMap().remove(id);
278285
}
279286
if (gr != null) {
280287
gr.onCompleted();
@@ -306,7 +313,7 @@ public RightDurationObserver(int id) {
306313
public void onCompleted() {
307314
if (once) {
308315
once = false;
309-
synchronized (guard) {
316+
synchronized (ResultManager.this) {
310317
rightMap.remove(id);
311318
}
312319
group.remove(this);

0 commit comments

Comments
 (0)