Skip to content

2.x: factor out Completable ops, unify disposed markers #4043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,190 changes: 70 additions & 1,120 deletions src/main/java/io/reactivex/Completable.java

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions src/main/java/io/reactivex/disposables/BooleanDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ public final class BooleanDisposable implements Disposable {
@Override
public void run() { }
};

static final Runnable EMPTY = new Runnable() {
@Override
public void run() { }
};

public BooleanDisposable() {
this(new Runnable() {
@Override
public void run() { }
});
this(EMPTY);
}

public BooleanDisposable(Runnable run) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
* and should be used by internal means only.
*
* @param <T> the resource tpye
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class ArrayCompositeResource<T> extends AtomicReferenceArray<Object> implements Disposable {
/** */
private static final long serialVersionUID = 2746389416410565408L;
Expand Down
114 changes: 114 additions & 0 deletions src/main/java/io/reactivex/internal/disposables/DisposableHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.Objects;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Utility methods for working with Disposables atomically.
*/
public enum DisposableHelper {
;

public static final Disposable DISPOSED = Disposed.INSTANCE;

public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}

public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}

public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}

public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
if (field.compareAndSet(current, d)) {
return true;
}
}
}

public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
if (current != DISPOSED) {
current = field.getAndSet(DISPOSED);
if (current != null && current != DISPOSED) {
current.dispose();
return true;
}
}
return false;
}

/**
* Verifies that current is null, next is not null, otherwise signals errors
* to the RxJavaPlugins and returns false
* @param current the current Disposable, expected to be null
* @param next the next Disposable, expected to be non-null
* @return true if the validation succeeded
*/
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {
next.dispose();
reportDisposableSet();
return false;
}
return true;
}

/**
* Reports that the disposable is already set to the RxJavaPlugins error handler.
*/
public static void reportDisposableSet() {
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
}

static enum Disposed implements Disposable {
INSTANCE;

@Override
public void dispose() {
// deliberately no-op
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
* A linked-list-based composite resource with custom disposer callback.
*
* @param <T> the resource type
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class ListCompositeResource<T> implements CompositeResource<T>, Disposable {
final Consumer<? super T> disposer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
* and should be used by internal means only.
*
* @param <T> the resource type
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class MultipleAssignmentResource<T> extends AtomicReference<Object> implements Disposable {
/** */
private static final long serialVersionUID = 5247635821051810205L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
* and should be used by internal means only.
*
* @param <T> the resource type
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class SerialResource<T> extends AtomicReference<Object> implements Disposable {
/** */
private static final long serialVersionUID = 5247635821051810205L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
* A set-based composite resource with custom disposer callback.
*
* @param <T> the resource type
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class SetCompositeResource<T> implements CompositeResource<T>, Disposable {
final Consumer<? super T> disposer;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.completable;

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableAmbArray extends Completable {

final CompletableConsumable[] sources;

public CompletableAmbArray(CompletableConsumable[] sources) {
this.sources = sources;
}

@Override
public void subscribeActual(final CompletableSubscriber s) {
final CompositeDisposable set = new CompositeDisposable();
s.onSubscribe(set);

final AtomicBoolean once = new AtomicBoolean();

CompletableSubscriber inner = new CompletableSubscriber() {
@Override
public void onComplete() {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onComplete();
}
}

@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.dispose();
s.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
}

};

for (CompletableConsumable c : sources) {
if (set.isDisposed()) {
return;
}
if (c == null) {
NullPointerException npe = new NullPointerException("One of the sources is null");
if (once.compareAndSet(false, true)) {
set.dispose();
s.onError(npe);
} else {
RxJavaPlugins.onError(npe);
}
return;
}
if (once.get() || set.isDisposed()) {
return;
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
}
}
}
Loading