Skip to content

Commit b441481

Browse files
committed
2.x: Subject/Processor improvements & small cleanup
1 parent f97c50d commit b441481

29 files changed

+2358
-2181
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3757,25 +3757,29 @@ public static <T, D> Flowable<T> using(Callable<? extends D> resourceSupplier,
37573757
* the parameter name.
37583758
* @param value the value to validate
37593759
* @param paramName the parameter name of the value
3760+
* @return the value
37603761
* @throws IllegalArgumentException if bufferSize &lt;= 0
37613762
*/
3762-
private static void verifyPositive(int value, String paramName) {
3763+
protected static int verifyPositive(int value, String paramName) {
37633764
if (value <= 0) {
37643765
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
37653766
}
3767+
return value;
37663768
}
37673769

37683770
/**
37693771
* Validate that the given value is positive or report an IllegalArgumentException with
37703772
* the parameter name.
37713773
* @param value the value to validate
37723774
* @param paramName the parameter name of the value
3775+
* @return the value
37733776
* @throws IllegalArgumentException if bufferSize &lt;= 0
37743777
*/
3775-
private static void verifyPositive(long value, String paramName) {
3778+
protected static long verifyPositive(long value, String paramName) {
37763779
if (value <= 0L) {
37773780
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
37783781
}
3782+
return value;
37793783
}
37803784

37813785
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3307,25 +3307,29 @@ public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier,
33073307
* the parameter name.
33083308
* @param value the value to validate
33093309
* @param paramName the parameter name of the value
3310+
* @return value
33103311
* @throws IllegalArgumentException if bufferSize &lt;= 0
33113312
*/
3312-
private static void verifyPositive(int value, String paramName) {
3313+
protected static int verifyPositive(int value, String paramName) {
33133314
if (value <= 0) {
33143315
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
33153316
}
3317+
return value;
33163318
}
33173319

33183320
/**
33193321
* Validate that the given value is positive or report an IllegalArgumentException with
33203322
* the parameter name.
33213323
* @param value the value to validate
33223324
* @param paramName the parameter name of the value
3325+
* @return value
33233326
* @throws IllegalArgumentException if bufferSize &lt;= 0
33243327
*/
3325-
private static void verifyPositive(long value, String paramName) {
3328+
protected static long verifyPositive(long value, String paramName) {
33263329
if (value <= 0L) {
33273330
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
33283331
}
3332+
return value;
33293333
}
33303334

33313335
/**

src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.reactivex.*;
1717
import io.reactivex.internal.functions.ObjectHelper;
18-
import io.reactivex.internal.subscribers.observable.BaseQueueDisposable;
18+
import io.reactivex.internal.subscribers.observable.BasicQueueDisposable;
1919

2020
public final class ObservableFromArray<T> extends Observable<T> {
2121
final T[] array;
@@ -38,7 +38,7 @@ public void subscribeActual(Observer<? super T> s) {
3838
d.run();
3939
}
4040

41-
static final class FromArrayDisposable<T> extends BaseQueueDisposable<T> {
41+
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
4242

4343
final Observer<? super T> actual;
4444

src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.internal.disposables.EmptyDisposable;
2121
import io.reactivex.internal.functions.ObjectHelper;
22-
import io.reactivex.internal.subscribers.observable.BaseQueueDisposable;
22+
import io.reactivex.internal.subscribers.observable.BasicQueueDisposable;
2323

2424
public final class ObservableFromIterable<T> extends Observable<T> {
2525
final Iterable<? extends T> source;
@@ -58,7 +58,7 @@ public void subscribeActual(Observer<? super T> s) {
5858
}
5959
}
6060

61-
static final class FromIterableDisposable<T> extends BaseQueueDisposable<T> {
61+
static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {
6262

6363
final Observer<? super T> actual;
6464

src/main/java/io/reactivex/internal/subscribers/observable/BaseIntQueueDisposable.java renamed to src/main/java/io/reactivex/internal/subscribers/observable/BasicIntQueueDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* that defaults all unnecessary Queue methods to throw UnsupportedOperationException.
2323
* @param <T> the output value type
2424
*/
25-
public abstract class BaseIntQueueDisposable<T>
25+
public abstract class BasicIntQueueDisposable<T>
2626
extends AtomicInteger
2727
implements QueueDisposable<T> {
2828

src/main/java/io/reactivex/internal/subscribers/observable/BaseQueueDisposable.java renamed to src/main/java/io/reactivex/internal/subscribers/observable/BasicQueueDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* unnecessary Queue methods to throw UnsupportedOperationException.
2121
* @param <T> the output value type
2222
*/
23-
public abstract class BaseQueueDisposable<T> implements QueueDisposable<T> {
23+
public abstract class BasicQueueDisposable<T> implements QueueDisposable<T> {
2424

2525
@Override
2626
public final boolean offer(T e) {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.subscribers.observable;
15+
16+
import io.reactivex.Observer;
17+
import io.reactivex.plugins.RxJavaPlugins;
18+
19+
/**
20+
* Represents a fuseable container for a single value.
21+
*
22+
* @param <T> the value type received and emitted
23+
*/
24+
public class DeferredScalarDisposable<T> extends BasicIntQueueDisposable<T> {
25+
/** */
26+
private static final long serialVersionUID = -5502432239815349361L;
27+
28+
/** The target of the events. */
29+
protected final Observer<? super T> actual;
30+
31+
/** The value stored temporarily when in fusion mode. */
32+
protected T value;
33+
34+
/** Indicates there was a call to complete(T). */
35+
static final int TERMINATED = 2;
36+
37+
/** Indicates the Disposable has been disposed. */
38+
static final int DISPOSED = 4;
39+
40+
/** Indicates this Disposable is in fusion mode and is currently empty. */
41+
static final int FUSED_EMPTY = 8;
42+
/** Indicates this Disposable is in fusion mode and has a value. */
43+
static final int FUSED_READY = 16;
44+
/** Indicates this Disposable is in fusion mode and its value has been consumed. */
45+
static final int FUSED_CONSUMED = 32;
46+
47+
/**
48+
* Constructs a DeferredScalarDisposable by wrapping the Observer.
49+
* @param actual the Observer to wrap, not null (not verified)
50+
*/
51+
public DeferredScalarDisposable(Observer<? super T> actual) {
52+
this.actual = actual;
53+
}
54+
55+
@Override
56+
public final int requestFusion(int mode) {
57+
if ((mode & ASYNC) != 0) {
58+
lazySet(FUSED_EMPTY);
59+
return ASYNC;
60+
}
61+
return NONE;
62+
}
63+
64+
/**
65+
* Complete the target with a single value or indicate there is a value available in
66+
* fusion mode.
67+
* @param value the value to signal, not null (not verified)
68+
*/
69+
public final void complete(T value) {
70+
int state = get();
71+
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
72+
return;
73+
}
74+
if (state == FUSED_EMPTY) {
75+
this.value = value;
76+
lazySet(FUSED_READY);
77+
} else {
78+
lazySet(TERMINATED);
79+
}
80+
Observer<? super T> a = actual;
81+
a.onNext(value);
82+
if (get() != DISPOSED) {
83+
a.onComplete();
84+
}
85+
}
86+
87+
/**
88+
* Complete the target with an error signal
89+
* @param t the Throwable to signal, not null (not verified)
90+
*/
91+
public final void error(Throwable t) {
92+
int state = get();
93+
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
94+
RxJavaPlugins.onError(t);
95+
return;
96+
}
97+
lazySet(TERMINATED);
98+
actual.onError(t);
99+
}
100+
101+
/**
102+
* Complete the target without any value.
103+
*/
104+
public final void complete() {
105+
int state = get();
106+
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
107+
return;
108+
}
109+
lazySet(TERMINATED);
110+
actual.onComplete();
111+
}
112+
113+
@Override
114+
public final T poll() throws Exception {
115+
if (get() == FUSED_READY) {
116+
T v = value;
117+
value = null;
118+
lazySet(FUSED_CONSUMED);
119+
return v;
120+
}
121+
return null;
122+
}
123+
124+
@Override
125+
public final boolean isEmpty() {
126+
return get() != FUSED_READY;
127+
}
128+
129+
@Override
130+
public final void clear() {
131+
lazySet(FUSED_CONSUMED);
132+
value = null;
133+
}
134+
135+
@Override
136+
public void dispose() {
137+
set(DISPOSED);
138+
value = null;
139+
}
140+
141+
/**
142+
* Try disposing this Disposable and return true if the current thread succeeded.
143+
* @return true if the current thread succeeded
144+
*/
145+
public final boolean tryDispose() {
146+
return getAndSet(DISPOSED) != DISPOSED;
147+
}
148+
149+
@Override
150+
public final boolean isDisposed() {
151+
return get() == DISPOSED;
152+
}
153+
154+
}

0 commit comments

Comments
 (0)