Skip to content

2.x: coverage and fixes 9/03-2 #4469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ void drain() {
} else
if (NotificationLite.isDisposable(v)) {
Disposable next = NotificationLite.getDisposable(v);
if (s != null) {
s.dispose();
s.dispose();
if (!cancelled) {
s = next;
} else {
next.dispose();
}
s = next;
} else
if (NotificationLite.isError(v)) {
q.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscribers.observable.BasicFuseableObserver;

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
Expand All @@ -31,75 +29,49 @@ public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapperSubscriber<T, U>(t, function));
source.subscribe(new MapObserver<T, U>(t, function));
}

static final class MapperSubscriber<T, U> implements Observer<T>, Disposable {
final Observer<? super U> actual;
final Function<? super T, ? extends U> function;

Disposable subscription;

boolean done;

public MapperSubscriber(Observer<? super U> actual, Function<? super T, ? extends U> function) {
this.actual = actual;
this.function = function;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.subscription, s)) {
subscription = s;
actual.onSubscribe(this);
}

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

public MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) {
if (done) {
return;
}
U u;
try {
u = function.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subscription.dispose();
onError(e);
return;
}
if (u == null) {
subscription.dispose();
onError(new NullPointerException("Value returned by the function is null"));
return;
}
actual.onNext(u);
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);

if (sourceMode != NONE) {
actual.onNext(null);
return;
}
done = true;
actual.onError(t);
}
@Override
public void onComplete() {
if (done) {

U v;

try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
done = true;
actual.onComplete();
actual.onNext(v);
}

@Override
public boolean isDisposed() {
return subscription.isDisposed();
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Override
public void dispose() {
subscription.dispose();
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ static final class RangeDisposable

long index;

boolean fused;

public RangeDisposable(Observer<? super Integer> actual, long start, long end) {
this.actual = actual;
this.index = start;
this.end = end;
}

void run() {
if (fused) {
return;
}
Observer<? super Integer> actual = this.actual;
long e = end;
for (long i = index; i != e && get() == 0; i++) {
Expand Down Expand Up @@ -107,7 +112,11 @@ public boolean isDisposed() {

@Override
public int requestFusion(int mode) {
return mode & SYNC;
if ((mode & SYNC) != 0) {
fused = true;
return SYNC;
}
return NONE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import org.reactivestreams.*;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.ExceptionHelper;

public abstract class BlockingSingleSubscriber<T> extends CountDownLatch
implements Subscriber<T>, Disposable {
implements Subscriber<T> {

T value;
Throwable error;
Expand All @@ -35,11 +35,14 @@ public BlockingSingleSubscriber() {

@Override
public final void onSubscribe(Subscription s) {
this.s = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
if (cancelled) {
s.cancel();
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
if (cancelled) {
this.s = SubscriptionHelper.CANCELLED;
s.cancel();
}
}
}
}
Expand All @@ -49,20 +52,6 @@ public final void onComplete() {
countDown();
}

@Override
public final void dispose() {
cancelled = true;
Subscription s = this.s;
if (s != null) {
s.cancel();
}
}

@Override
public final boolean isDisposed() {
return cancelled;
}

/**
* Block until the first value arrives and return it, otherwise
* return null for an empty source and rethrow any exception.
Expand All @@ -73,7 +62,11 @@ public final T blockingGet() {
try {
await();
} catch (InterruptedException ex) {
dispose();
Subscription s = this.s;
this.s = SubscriptionHelper.CANCELLED;
if (s != null) {
s.cancel();
}
throw ExceptionHelper.wrapOrThrow(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ public BlockingSubscriber(Queue<Object> queue) {

@Override
public void onSubscribe(Subscription s) {
if (!compareAndSet(null, s)) {
s.cancel();
if (get() != SubscriptionHelper.CANCELLED) {
onError(new IllegalStateException("Subscription already set"));
}
return;
if (SubscriptionHelper.setOnce(this, s)) {
queue.offer(NotificationLite.subscription(this));
}
queue.offer(NotificationLite.subscription(this));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,17 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (error == null) {
for (;;) {
Subscription a = s.get();
if (a == this || a == SubscriptionHelper.CANCELLED) {
RxJavaPlugins.onError(t);
return;
}
error = t;

for (;;) {
Subscription a = s.get();
if (a == this || a == SubscriptionHelper.CANCELLED) {
RxJavaPlugins.onError(t);
return;
}
if (s.compareAndSet(a, this)) {
countDown();
return;
}
if (s.compareAndSet(a, this)) {
countDown();
return;
}
} else {
RxJavaPlugins.onError(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public boolean setResource(int index, Subscription resource) {
for (;;) {
Subscription o = get(index);
if (o == SubscriptionHelper.CANCELLED) {
resource.cancel();
if (resource != null) {
resource.cancel();
}
return false;
}
if (compareAndSet(index, o, resource)) {
Expand All @@ -66,7 +68,9 @@ public Subscription replaceResource(int index, Subscription resource) {
for (;;) {
Subscription o = get(index);
if (o == SubscriptionHelper.CANCELLED) {
resource.cancel();
if (resource != null) {
resource.cancel();
}
return null;
}
if (compareAndSet(index, o, resource)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.BackpressureHelper;

/**
* A subscription implementation that arbitrates exactly one other Subscription and can
Expand Down Expand Up @@ -47,19 +46,7 @@ public AsyncSubscription(Disposable resource) {

@Override
public void request(long n) {
Subscription s = actual.get();
if (s != null) {
s.request(n);
} else if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
s = actual.get();
if (s != null) {
long mr = getAndSet(0L);
if (mr != 0L) {
s.request(mr);
}
}
}
SubscriptionHelper.deferredRequest(actual, this, n);
}

@Override
Expand Down Expand Up @@ -100,27 +87,8 @@ public boolean replaceResource(Disposable r) {
/**
* Sets the given subscription if there isn't any subscription held.
* @param s the first and only subscription to set
* @return false if this AsyncSubscription has been cancelled/disposed
*/
public boolean setSubscription(Subscription s) {
for (;;) {
Subscription a = actual.get();
if (a == SubscriptionHelper.CANCELLED) {
s.cancel();
return false;
}
if (a != null) {
s.cancel();
SubscriptionHelper.reportSubscriptionSet();
return true;
}
if (actual.compareAndSet(null, s)) {
long mr = getAndSet(0L);
if (mr != 0L) {
s.request(mr);
}
return true;
}
}
public void setSubscription(Subscription s) {
SubscriptionHelper.deferredSetOnce(actual, this, s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ public final void request(long n) {
if (state == NO_REQUEST_HAS_VALUE) {
if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
T v = value;
value = null;
Subscriber<? super T> a = actual;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
if (v != null) {
value = null;
Subscriber<? super T> a = actual;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
}
}
return;
Expand Down
Loading