Skip to content

Commit 07eab08

Browse files
committed
use ProducerArbiter from @akarnokd, delete OnSubscribeRetry
1 parent 8c3dd49 commit 07eab08

File tree

2 files changed

+185
-314
lines changed

2 files changed

+185
-314
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 185 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import java.util.concurrent.atomic.AtomicBoolean;
3737
import java.util.concurrent.atomic.AtomicLong;
38-
import java.util.concurrent.atomic.AtomicReference;
3938

4039
import rx.Notification;
4140
import rx.Observable;
@@ -53,7 +52,7 @@
5352

5453
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
5554

56-
static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INIFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
55+
static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
5756
@Override
5857
public Observable<?> call(Observable<? extends Notification<?>> ts) {
5958
return ts.map(new Func1<Notification<?>, Notification<?>>() {
@@ -120,17 +119,15 @@ public Notification<Integer> call(Notification<Integer> n, Notification<?> term)
120119
}
121120

122121
public static <T> Observable<T> retry(Observable<T> source) {
123-
// return retry(source, REDO_INIFINITE);
124-
return retry(source, Long.MAX_VALUE);
122+
return retry(source, REDO_INFINITE);
125123
}
126124

127125
public static <T> Observable<T> retry(Observable<T> source, final long count) {
128126
if (count < 0)
129127
throw new IllegalArgumentException("count >= 0 expected");
130128
if (count == 0)
131129
return source;
132-
// return retry(source, new RedoFinite(count));
133-
return create(new OnSubscribeRetry<T>(source, count));
130+
return retry(source, new RedoFinite(count));
134131
}
135132

136133
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
@@ -146,7 +143,7 @@ public static <T> Observable<T> repeat(Observable<T> source) {
146143
}
147144

148145
public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
149-
return repeat(source, REDO_INIFINITE, scheduler);
146+
return repeat(source, REDO_INFINITE, scheduler);
150147
}
151148

152149
public static <T> Observable<T> repeat(Observable<T> source, final long count) {
@@ -195,7 +192,6 @@ public void call(final Subscriber<? super T> child) {
195192
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
196193
// incremented when requests are made, decremented when requests are fulfilled
197194
final AtomicLong consumerCapacity = new AtomicLong(0l);
198-
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
199195

200196
final Scheduler.Worker worker = scheduler.createWorker();
201197
child.add(worker);
@@ -205,6 +201,8 @@ public void call(final Subscriber<? super T> child) {
205201

206202
final PublishSubject<Notification<?>> terminals = PublishSubject.create();
207203

204+
final ProducerArbiter arbiter = new ProducerArbiter();
205+
208206
final Action0 subscribeToSource = new Action0() {
209207
@Override
210208
public void call() {
@@ -214,43 +212,37 @@ public void call() {
214212

215213
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
216214
boolean done;
215+
217216
@Override
218217
public void onCompleted() {
219218
if (!done) {
220219
done = true;
221-
synchronized (consumerCapacity) {
222-
currentProducer.set(null);
223-
unsubscribe();
224-
terminals.onNext(Notification.createOnCompleted());
225-
}
220+
unsubscribe();
221+
terminals.onNext(Notification.createOnCompleted());
226222
}
227223
}
228224

229225
@Override
230226
public void onError(Throwable e) {
231227
if (!done) {
232228
done = true;
233-
synchronized (consumerCapacity) {
234-
currentProducer.set(null);
235-
unsubscribe();
236-
terminals.onNext(Notification.createOnError(e));
237-
}
229+
unsubscribe();
230+
terminals.onNext(Notification.createOnError(e));
238231
}
239232
}
240233

241234
@Override
242235
public void onNext(T v) {
243236
if (!done) {
244-
synchronized (consumerCapacity) {
245-
child.onNext(v);
246-
decrementConsumerCapacity();
247-
}
237+
child.onNext(v);
238+
decrementConsumerCapacity();
239+
arbiter.produced(1);
248240
}
249241
}
250-
242+
251243
private void decrementConsumerCapacity() {
252-
// use a CAS loop because we don't want to decrement the value
253-
// if it is Long.MAX_VALUE
244+
// use a CAS loop because we don't want to decrement the
245+
// value if it is Long.MAX_VALUE
254246
while (true) {
255247
long cc = consumerCapacity.get();
256248
if (cc != Long.MAX_VALUE) {
@@ -265,13 +257,7 @@ private void decrementConsumerCapacity() {
265257

266258
@Override
267259
public void setProducer(Producer producer) {
268-
synchronized (consumerCapacity) {
269-
currentProducer.set(producer);
270-
long c = consumerCapacity.get();
271-
if (c > 0) {
272-
producer.request(c);
273-
}
274-
}
260+
arbiter.setProducer(producer);
275261
}
276262
};
277263
// new subscription each time so if it unsubscribes itself it does not prevent retries
@@ -337,17 +323,11 @@ public void onError(Throwable e) {
337323
@Override
338324
public void onNext(Object t) {
339325
if (!isLocked.get() && !child.isUnsubscribed()) {
340-
final boolean scheduleNow;
341-
synchronized (consumerCapacity) {
342-
if (consumerCapacity.get() > 0) {
343-
scheduleNow = true;
344-
} else {
345-
scheduleNow = false;
346-
resumeBoundary.compareAndSet(false, true);
347-
}
348-
}
349-
if (scheduleNow)
326+
if (consumerCapacity.get() > 0) {
350327
worker.schedule(subscribeToSource);
328+
} else {
329+
resumeBoundary.compareAndSet(false, true);
330+
}
351331
}
352332
}
353333

@@ -363,26 +343,173 @@ public void setProducer(Producer producer) {
363343

364344
@Override
365345
public void request(final long n) {
366-
final Producer producer;
367-
final boolean requestNow;
368-
final boolean scheduleNow;
369-
synchronized (consumerCapacity) {
346+
if (n > 0) {
370347
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
371-
producer = currentProducer.get();
372-
if (producer != null) {
373-
requestNow = true;
374-
scheduleNow = false;
375-
} else {
376-
requestNow = false;
377-
scheduleNow = resumeBoundary.compareAndSet(true, false);
378-
}
348+
arbiter.request(n);
349+
if (resumeBoundary.compareAndSet(true, false))
350+
worker.schedule(subscribeToSource);
379351
}
380-
if (requestNow)
381-
producer.request(n);
382-
else if (scheduleNow)
383-
worker.schedule(subscribeToSource);
384352
}
385353
});
386354

387355
}
356+
357+
static final class ProducerArbiter implements Producer {
358+
/** Guarded by this. */
359+
boolean emitting;
360+
/** The current producer. Accessed while emitting. */
361+
Producer currentProducer;
362+
/** The current requested count. */
363+
long requested;
364+
365+
long missedRequested;
366+
Producer missedProducer;
367+
long missedProd;
368+
369+
@Override
370+
public void request(long n) {
371+
if (n <= 0) {
372+
return;
373+
}
374+
Producer mp;
375+
long mprod;
376+
synchronized (this) {
377+
if (emitting) {
378+
missedRequested += n;
379+
return;
380+
}
381+
emitting = true;
382+
mp = missedProducer;
383+
mprod = missedProd;
384+
385+
missedProducer = null;
386+
missedProd = 0L;
387+
}
388+
389+
boolean skipFinal = false;
390+
try {
391+
emit(n, mp, mprod);
392+
drainLoop();
393+
skipFinal = true;
394+
} finally {
395+
if (!skipFinal) {
396+
synchronized (this) {
397+
emitting = false;
398+
}
399+
}
400+
}
401+
}
402+
public void setProducer(Producer p) {
403+
if (p == null) {
404+
throw new NullPointerException();
405+
}
406+
407+
long mreq;
408+
long mprod;
409+
synchronized (this) {
410+
if (emitting) {
411+
missedProducer = p;
412+
return;
413+
}
414+
emitting = true;
415+
mreq = missedRequested;
416+
mprod = missedProd;
417+
418+
missedRequested = 0L;
419+
missedProd = 0L;
420+
}
421+
422+
boolean skipFinal = false;
423+
try {
424+
emit(mreq, p, mprod);
425+
drainLoop();
426+
skipFinal = true;
427+
} finally {
428+
if (!skipFinal) {
429+
synchronized (this) {
430+
emitting = false;
431+
}
432+
}
433+
}
434+
}
435+
public void produced(long n) {
436+
if (n <= 0) {
437+
throw new IllegalArgumentException(n + " produced?!");
438+
}
439+
440+
long mreq;
441+
Producer mp;
442+
synchronized (this) {
443+
if (emitting) {
444+
missedProd += n;
445+
return;
446+
}
447+
emitting = true;
448+
mreq = missedRequested;
449+
mp = missedProducer;
450+
451+
missedRequested = 0L;
452+
missedProducer = null;
453+
}
454+
455+
boolean skipFinal = false;
456+
try {
457+
emit(mreq, mp, n);
458+
drainLoop();
459+
skipFinal = true;
460+
} finally {
461+
if (!skipFinal) {
462+
synchronized (this) {
463+
emitting = false;
464+
}
465+
}
466+
}
467+
}
468+
void drainLoop() {
469+
for (;;) {
470+
long mreq;
471+
long mprod;
472+
Producer mp;
473+
synchronized (this) {
474+
mreq = missedRequested;
475+
mprod = missedProd;
476+
mp = missedProducer;
477+
if (mreq == 0L && mp == null && mprod == 0L) {
478+
emitting = false;
479+
return;
480+
}
481+
missedRequested = 0L;
482+
missedProd = 0L;
483+
missedProducer = null;
484+
}
485+
emit(mreq, mp, mprod);
486+
}
487+
}
488+
void emit(long mreq, Producer mp, long mprod) {
489+
boolean newMp = false;
490+
if (mp != null) {
491+
newMp = true;
492+
currentProducer = mp;
493+
} else {
494+
mp = currentProducer;
495+
}
496+
497+
long u = requested + mreq;
498+
if (u < 0) {
499+
u = Long.MAX_VALUE;
500+
}
501+
u -= mprod;
502+
if (u < 0) {
503+
throw new IllegalStateException("More produced than requested");
504+
}
505+
requested = u;
506+
507+
if (mreq > 0 && mp != null) {
508+
mp.request(mreq);
509+
} else
510+
if (newMp && u > 0) {
511+
mp.request(u);
512+
}
513+
}
514+
}
388515
}

0 commit comments

Comments
 (0)