Skip to content

Commit 89f2d6f

Browse files
davidmotenakarnokd
authored andcommitted
reduce allocations in OnSubscribeJoin (#4236)
1 parent 7d42628 commit 89f2d6f

File tree

1 file changed

+28
-22
lines changed

1 file changed

+28
-22
lines changed

src/main/java/rx/internal/operators/OnSubscribeJoin.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,36 @@ public void call(Subscriber<? super R> t1) {
6161
}
6262

6363
/** Manage the left and right sources. */
64-
final class ResultSink {
64+
final class ResultSink extends HashMap<Integer,TLeft> {
65+
//HashMap aspect of `this` refers to the `leftMap`
66+
67+
private static final long serialVersionUID = 3491669543549085380L;
68+
6569
final CompositeSubscription group;
6670
final Subscriber<? super R> subscriber;
67-
final Object guard = new Object();
68-
/** Guarded by guard. */
71+
/** Guarded by this. */
6972
boolean leftDone;
70-
/** Guarded by guard. */
73+
/** Guarded by this. */
7174
int leftId;
72-
/** Guarded by guard. */
73-
final Map<Integer, TLeft> leftMap;
74-
/** Guarded by guard. */
75+
/** Guarded by this. */
7576
boolean rightDone;
76-
/** Guarded by guard. */
77+
/** Guarded by this. */
7778
int rightId;
78-
/** Guarded by guard. */
79+
/** Guarded by this. */
7980
final Map<Integer, TRight> rightMap;
8081

8182
public ResultSink(Subscriber<? super R> subscriber) {
83+
super();
8284
this.subscriber = subscriber;
8385
this.group = new CompositeSubscription();
84-
this.leftMap = new HashMap<Integer, TLeft>();
86+
//`leftMap` is `this`
8587
this.rightMap = new HashMap<Integer, TRight>();
8688
}
8789

90+
HashMap<Integer, TLeft> leftMap() {
91+
return this;
92+
}
93+
8894
public void run() {
8995
subscriber.add(group);
9096

@@ -103,8 +109,8 @@ final class LeftSubscriber extends Subscriber<TLeft> {
103109

104110
protected void expire(int id, Subscription resource) {
105111
boolean complete = false;
106-
synchronized (guard) {
107-
if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) {
112+
synchronized (ResultSink.this) {
113+
if (leftMap().remove(id) != null && leftMap().isEmpty() && leftDone) {
108114
complete = true;
109115
}
110116
}
@@ -121,9 +127,9 @@ public void onNext(TLeft args) {
121127
int id;
122128
int highRightId;
123129

124-
synchronized (guard) {
130+
synchronized (ResultSink.this) {
125131
id = leftId++;
126-
leftMap.put(id, args);
132+
leftMap().put(id, args);
127133
highRightId = rightId;
128134
}
129135

@@ -137,7 +143,7 @@ public void onNext(TLeft args) {
137143
duration.unsafeSubscribe(d1);
138144

139145
List<TRight> rightValues = new ArrayList<TRight>();
140-
synchronized (guard) {
146+
synchronized (ResultSink.this) {
141147
for (Map.Entry<Integer, TRight> entry : rightMap.entrySet()) {
142148
if (entry.getKey() < highRightId) {
143149
rightValues.add(entry.getValue());
@@ -162,9 +168,9 @@ public void onError(Throwable e) {
162168
@Override
163169
public void onCompleted() {
164170
boolean complete = false;
165-
synchronized (guard) {
171+
synchronized (ResultSink.this) {
166172
leftDone = true;
167-
if (rightDone || leftMap.isEmpty()) {
173+
if (rightDone || leftMap().isEmpty()) {
168174
complete = true;
169175
}
170176
}
@@ -211,7 +217,7 @@ final class RightSubscriber extends Subscriber<TRight> {
211217

212218
void expire(int id, Subscription resource) {
213219
boolean complete = false;
214-
synchronized (guard) {
220+
synchronized (ResultSink.this) {
215221
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
216222
complete = true;
217223
}
@@ -228,7 +234,7 @@ void expire(int id, Subscription resource) {
228234
public void onNext(TRight args) {
229235
int id;
230236
int highLeftId;
231-
synchronized (guard) {
237+
synchronized (ResultSink.this) {
232238
id = rightId++;
233239
rightMap.put(id, args);
234240
highLeftId = leftId;
@@ -247,8 +253,8 @@ public void onNext(TRight args) {
247253

248254

249255
List<TLeft> leftValues = new ArrayList<TLeft>();
250-
synchronized (guard) {
251-
for (Map.Entry<Integer, TLeft> entry : leftMap.entrySet()) {
256+
synchronized (ResultSink.this) {
257+
for (Map.Entry<Integer, TLeft> entry : leftMap().entrySet()) {
252258
if (entry.getKey() < highLeftId) {
253259
leftValues.add(entry.getValue());
254260
}
@@ -274,7 +280,7 @@ public void onError(Throwable e) {
274280
@Override
275281
public void onCompleted() {
276282
boolean complete = false;
277-
synchronized (guard) {
283+
synchronized (ResultSink.this) {
278284
rightDone = true;
279285
if (leftDone || rightMap.isEmpty()) {
280286
complete = true;

0 commit comments

Comments
 (0)