Skip to content

Commit 461a7bb

Browse files
authored
2.x: factor out Completable ops, unify disposed markers (#4043)
* Some operators extracted * 2.x: factor out Completable ops, make them direct, fix disposed markers
1 parent 7a4d633 commit 461a7bb

File tree

128 files changed

+2595
-2289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

128 files changed

+2595
-2289
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 70 additions & 1120 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/disposables/BooleanDisposable.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ public final class BooleanDisposable implements Disposable {
2222
@Override
2323
public void run() { }
2424
};
25+
26+
static final Runnable EMPTY = new Runnable() {
27+
@Override
28+
public void run() { }
29+
};
2530

2631
public BooleanDisposable() {
27-
this(new Runnable() {
28-
@Override
29-
public void run() { }
30-
});
32+
this(EMPTY);
3133
}
3234

3335
public BooleanDisposable(Runnable run) {

src/main/java/io/reactivex/internal/disposables/ArrayCompositeResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
* and should be used by internal means only.
2727
*
2828
* @param <T> the resource tpye
29+
* @deprecated Use more type-specific and inlined resource management
2930
*/
31+
@Deprecated
3032
public final class ArrayCompositeResource<T> extends AtomicReferenceArray<Object> implements Disposable {
3133
/** */
3234
private static final long serialVersionUID = 2746389416410565408L;
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package io.reactivex.internal.disposables;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
5+
import io.reactivex.disposables.Disposable;
6+
import io.reactivex.internal.functions.Objects;
7+
import io.reactivex.plugins.RxJavaPlugins;
8+
9+
/**
10+
* Utility methods for working with Disposables atomically.
11+
*/
12+
public enum DisposableHelper {
13+
;
14+
15+
public static final Disposable DISPOSED = Disposed.INSTANCE;
16+
17+
public static boolean isDisposed(Disposable d) {
18+
return d == DISPOSED;
19+
}
20+
21+
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
22+
for (;;) {
23+
Disposable current = field.get();
24+
if (current == DISPOSED) {
25+
if (d != null) {
26+
d.dispose();
27+
}
28+
return false;
29+
}
30+
if (field.compareAndSet(current, d)) {
31+
if (current != null) {
32+
current.dispose();
33+
}
34+
return true;
35+
}
36+
}
37+
}
38+
39+
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
40+
Objects.requireNonNull(d, "d is null");
41+
if (!field.compareAndSet(null, d)) {
42+
d.dispose();
43+
if (field.get() != DISPOSED) {
44+
reportDisposableSet();
45+
}
46+
return false;
47+
}
48+
return true;
49+
}
50+
51+
public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
52+
for (;;) {
53+
Disposable current = field.get();
54+
if (current == DISPOSED) {
55+
if (d != null) {
56+
d.dispose();
57+
}
58+
return false;
59+
}
60+
if (field.compareAndSet(current, d)) {
61+
return true;
62+
}
63+
}
64+
}
65+
66+
public static boolean dispose(AtomicReference<Disposable> field) {
67+
Disposable current = field.get();
68+
if (current != DISPOSED) {
69+
current = field.getAndSet(DISPOSED);
70+
if (current != null && current != DISPOSED) {
71+
current.dispose();
72+
return true;
73+
}
74+
}
75+
return false;
76+
}
77+
78+
/**
79+
* Verifies that current is null, next is not null, otherwise signals errors
80+
* to the RxJavaPlugins and returns false
81+
* @param current the current Disposable, expected to be null
82+
* @param next the next Disposable, expected to be non-null
83+
* @return true if the validation succeeded
84+
*/
85+
public static boolean validate(Disposable current, Disposable next) {
86+
if (next == null) {
87+
RxJavaPlugins.onError(new NullPointerException("next is null"));
88+
return false;
89+
}
90+
if (current != null) {
91+
next.dispose();
92+
reportDisposableSet();
93+
return false;
94+
}
95+
return true;
96+
}
97+
98+
/**
99+
* Reports that the disposable is already set to the RxJavaPlugins error handler.
100+
*/
101+
public static void reportDisposableSet() {
102+
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
103+
}
104+
105+
static enum Disposed implements Disposable {
106+
INSTANCE;
107+
108+
@Override
109+
public void dispose() {
110+
// deliberately no-op
111+
}
112+
}
113+
114+
}

src/main/java/io/reactivex/internal/disposables/ListCompositeResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
* A linked-list-based composite resource with custom disposer callback.
2323
*
2424
* @param <T> the resource type
25+
* @deprecated Use more type-specific and inlined resource management
2526
*/
27+
@Deprecated
2628
public final class ListCompositeResource<T> implements CompositeResource<T>, Disposable {
2729
final Consumer<? super T> disposer;
2830

src/main/java/io/reactivex/internal/disposables/MultipleAssignmentResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
* and should be used by internal means only.
3232
*
3333
* @param <T> the resource type
34+
* @deprecated Use more type-specific and inlined resource management
3435
*/
36+
@Deprecated
3537
public final class MultipleAssignmentResource<T> extends AtomicReference<Object> implements Disposable {
3638
/** */
3739
private static final long serialVersionUID = 5247635821051810205L;

src/main/java/io/reactivex/internal/disposables/SerialResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
* and should be used by internal means only.
3232
*
3333
* @param <T> the resource type
34+
* @deprecated Use more type-specific and inlined resource management
3435
*/
36+
@Deprecated
3537
public final class SerialResource<T> extends AtomicReference<Object> implements Disposable {
3638
/** */
3739
private static final long serialVersionUID = 5247635821051810205L;

src/main/java/io/reactivex/internal/disposables/SetCompositeResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
* A set-based composite resource with custom disposer callback.
2222
*
2323
* @param <T> the resource type
24+
* @deprecated Use more type-specific and inlined resource management
2425
*/
26+
@Deprecated
2527
public final class SetCompositeResource<T> implements CompositeResource<T>, Disposable {
2628
final Consumer<? super T> disposer;
2729

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.*;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
public final class CompletableAmbArray extends Completable {
23+
24+
final CompletableConsumable[] sources;
25+
26+
public CompletableAmbArray(CompletableConsumable[] sources) {
27+
this.sources = sources;
28+
}
29+
30+
@Override
31+
public void subscribeActual(final CompletableSubscriber s) {
32+
final CompositeDisposable set = new CompositeDisposable();
33+
s.onSubscribe(set);
34+
35+
final AtomicBoolean once = new AtomicBoolean();
36+
37+
CompletableSubscriber inner = new CompletableSubscriber() {
38+
@Override
39+
public void onComplete() {
40+
if (once.compareAndSet(false, true)) {
41+
set.dispose();
42+
s.onComplete();
43+
}
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
if (once.compareAndSet(false, true)) {
49+
set.dispose();
50+
s.onError(e);
51+
} else {
52+
RxJavaPlugins.onError(e);
53+
}
54+
}
55+
56+
@Override
57+
public void onSubscribe(Disposable d) {
58+
set.add(d);
59+
}
60+
61+
};
62+
63+
for (CompletableConsumable c : sources) {
64+
if (set.isDisposed()) {
65+
return;
66+
}
67+
if (c == null) {
68+
NullPointerException npe = new NullPointerException("One of the sources is null");
69+
if (once.compareAndSet(false, true)) {
70+
set.dispose();
71+
s.onError(npe);
72+
} else {
73+
RxJavaPlugins.onError(npe);
74+
}
75+
return;
76+
}
77+
if (once.get() || set.isDisposed()) {
78+
return;
79+
}
80+
81+
// no need to have separate subscribers because inner is stateless
82+
c.subscribe(inner);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)