File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change @@ -238,8 +238,11 @@ public void onNext(T v) {
238
238
child .onNext (v );
239
239
while (true ) {
240
240
long cc = consumerCapacity .get ();
241
- if (cc != Long .MAX_VALUE ) {
242
- if (consumerCapacity .compareAndSet (cc , cc -1 )) {
241
+ if (cc == 0 ) {
242
+ // wasn't requested but has arrived anyway
243
+ break ;
244
+ } else if (cc != Long .MAX_VALUE ) {
245
+ if (consumerCapacity .compareAndSet (cc , cc - 1 )) {
243
246
break ;
244
247
}
245
248
} else {
@@ -320,10 +323,14 @@ public void onError(Throwable e) {
320
323
321
324
@ Override
322
325
public void onNext (Object t ) {
326
+ // a restart instruction has arrived
323
327
if (!isLocked .get () && !child .isUnsubscribed ()) {
328
+ // if there are outstanding requests
324
329
if (consumerCapacity .get () > 0 ) {
330
+ // schedule resubscription
325
331
worker .schedule (subscribeToSource );
326
332
} else {
333
+ // otherwise we indicate that on the next request we should resubscribe
327
334
resumeBoundary .compareAndSet (false , true );
328
335
}
329
336
}
You can’t perform that action at this time.
0 commit comments