Skip to content

Commit 4603dcf

Browse files
committed
Merge pull request #3688 from akarnokd/ZipStartBarrierFix
1.x: Fix zip() - observer array becoming visible too early and causing NPE
2 parents 63434c6 + 6291fb9 commit 4603dcf

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,10 @@ public void request(long n) {
177177

178178
}
179179

180-
private static final class Zip<R> extends AtomicLong {
180+
static final class Zip<R> extends AtomicLong {
181+
/** */
182+
private static final long serialVersionUID = 5995274816189928317L;
183+
181184
final Observer<? super R> child;
182185
private final FuncN<? extends R> zipFunction;
183186
private final CompositeSubscription childSubscription = new CompositeSubscription();
@@ -186,7 +189,7 @@ private static final class Zip<R> extends AtomicLong {
186189
int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block
187190

188191
/* initialized when started in `start` */
189-
private Object[] observers;
192+
private volatile Object[] subscribers;
190193
private AtomicLong requested;
191194

192195
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
@@ -197,16 +200,18 @@ public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
197200

198201
@SuppressWarnings("unchecked")
199202
public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) {
200-
observers = new Object[os.length];
201-
this.requested = requested;
203+
final Object[] subscribers = new Object[os.length];
202204
for (int i = 0; i < os.length; i++) {
203205
InnerSubscriber io = new InnerSubscriber();
204-
observers[i] = io;
206+
subscribers[i] = io;
205207
childSubscription.add(io);
206208
}
207-
209+
210+
this.requested = requested;
211+
this.subscribers = subscribers; // full memory barrier: release all above
212+
208213
for (int i = 0; i < os.length; i++) {
209-
os[i].unsafeSubscribe((InnerSubscriber) observers[i]);
214+
os[i].unsafeSubscribe((InnerSubscriber) subscribers[i]);
210215
}
211216
}
212217

@@ -219,13 +224,13 @@ public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requ
219224
*/
220225
@SuppressWarnings("unchecked")
221226
void tick() {
222-
final Object[] observers = this.observers;
223-
if (observers == null) {
227+
final Object[] subscribers = this.subscribers;
228+
if (subscribers == null) {
224229
// nothing yet to do (initial request from Producer)
225230
return;
226231
}
227232
if (getAndIncrement() == 0) {
228-
final int length = observers.length;
233+
final int length = subscribers.length;
229234
final Observer<? super R> child = this.child;
230235
final AtomicLong requested = this.requested;
231236
do {
@@ -234,7 +239,7 @@ void tick() {
234239
final Object[] vs = new Object[length];
235240
boolean allHaveValues = true;
236241
for (int i = 0; i < length; i++) {
237-
RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items;
242+
RxRingBuffer buffer = ((InnerSubscriber) subscribers[i]).items;
238243
Object n = buffer.peek();
239244

240245
if (n == null) {
@@ -265,7 +270,7 @@ void tick() {
265270
return;
266271
}
267272
// now remove them
268-
for (Object obj : observers) {
273+
for (Object obj : subscribers) {
269274
RxRingBuffer buffer = ((InnerSubscriber) obj).items;
270275
buffer.poll();
271276
// eagerly check if the next item on this queue is an onComplete
@@ -278,7 +283,7 @@ void tick() {
278283
}
279284
}
280285
if (emitted > THRESHOLD) {
281-
for (Object obj : observers) {
286+
for (Object obj : subscribers) {
282287
((InnerSubscriber) obj).requestMore(emitted);
283288
}
284289
emitted = 0;

0 commit comments

Comments
 (0)