Skip to content

Commit 60bf4fc

Browse files
authored
2.x: added missing ops, cleanup 8/19-1 (#4375)
1 parent f0d1d34 commit 60bf4fc

32 files changed

+1653
-418
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.reactivex.internal.subscribers.completable.*;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030
import io.reactivex.schedulers.Schedulers;
31+
import io.reactivex.subscribers.TestSubscriber;
3132

3233
/**
3334
* Represents a deferred computation without any value but only indication for completion or exception.
@@ -165,33 +166,44 @@ public static Completable concat(Publisher<? extends CompletableSource> sources,
165166
}
166167

167168
/**
168-
* Constructs a Completable instance by wrapping the given source callback.
169+
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
170+
* <p>
171+
* Example:
172+
* <pre><code>
173+
* Completable.create(emitter -&gt; {
174+
* Callback listener = new Callback() {
175+
* &#64;Override
176+
* public void onEvent(Event e) {
177+
* emitter.onComplete();
178+
* }
179+
*
180+
* &#64;Override
181+
* public void onFailure(Exception e) {
182+
* emitter.onError(e);
183+
* }
184+
* };
185+
*
186+
* AutoCloseable c = api.someMethod(listener);
187+
*
188+
* emitter.setCancellable(c::close);
189+
*
190+
* });
191+
* </code></pre>
192+
* <p>
169193
* <dl>
170194
* <dt><b>Scheduler:</b></dt>
171195
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
172196
* </dl>
173-
* @param source the callback which will receive the CompletableObserver instances
174-
* when the Completable is subscribed to.
175-
* @return the created Completable instance
176-
* @throws NullPointerException if source is null
197+
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
198+
* @return the new Completable instance
199+
* @see FlowableOnSubscribe
200+
* @see FlowableEmitter.BackpressureMode
201+
* @see Cancellable
177202
*/
178203
@SchedulerSupport(SchedulerSupport.NONE)
179-
public static Completable create(CompletableSource source) {
204+
public static Completable create(CompletableOnSubscribe source) {
180205
Objects.requireNonNull(source, "source is null");
181-
if (source instanceof Completable) {
182-
throw new IllegalArgumentException("Use of create(Completable)!");
183-
}
184-
try {
185-
// TODO plugin wrapping source
186-
187-
return RxJavaPlugins.onAssembly(new CompletableFromSource(source));
188-
} catch (NullPointerException ex) { // NOPMD
189-
throw ex;
190-
} catch (Throwable ex) {
191-
Exceptions.throwIfFatal(ex);
192-
RxJavaPlugins.onError(ex);
193-
throw toNpe(ex);
194-
}
206+
return RxJavaPlugins.onAssembly(new CompletableCreate(source));
195207
}
196208

197209
/**
@@ -214,8 +226,6 @@ public static Completable unsafeCreate(CompletableSource source) {
214226
throw new IllegalArgumentException("Use of unsafeCreate(Completable)!");
215227
}
216228
try {
217-
// TODO plugin wrapping source
218-
219229
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
220230
} catch (NullPointerException ex) { // NOPMD
221231
throw ex;
@@ -1696,4 +1706,33 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
16961706
Objects.requireNonNull(scheduler, "scheduler is null");
16971707
return new CompletableUnsubscribeOn(this, scheduler);
16981708
}
1709+
// -------------------------------------------------------------------------
1710+
// Fluent test support, super handy and reduces test preparation boilerplate
1711+
// -------------------------------------------------------------------------
1712+
1713+
/**
1714+
* Creates a TestSubscriber and subscribes
1715+
* it to this Completable.
1716+
* @return the new TestSubscriber instance
1717+
* @since 2.0
1718+
*/
1719+
public final TestSubscriber<Void> test() {
1720+
TestSubscriber<Void> ts = new TestSubscriber<Void>();
1721+
subscribe(ts);
1722+
return ts;
1723+
}
1724+
1725+
/**
1726+
* Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Completable.
1727+
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
1728+
* Completable.
1729+
* @return the new TestSubscriber instance
1730+
* @since 2.0
1731+
*/
1732+
public final TestSubscriber<Void> test(boolean cancelled) {
1733+
TestSubscriber<Void> ts = new TestSubscriber<Void>();
1734+
ts.dispose();
1735+
subscribe(ts);
1736+
return ts;
1737+
}
16991738
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.disposables.Disposable;
17+
import io.reactivex.functions.Cancellable;
18+
19+
/**
20+
* Abstraction over a RxJava CompletableObserver that allows associating
21+
* a resource with it.
22+
* <p>
23+
* All methods are safe to call from multiple threads.
24+
* <p>
25+
* Calling onComplete or onError multiple times has no effect.
26+
*/
27+
public interface CompletableEmitter {
28+
29+
/**
30+
* Signal the completion.
31+
*/
32+
void onComplete();
33+
34+
/**
35+
* Signal an exception.
36+
* @param t the exception, not null
37+
*/
38+
void onError(Throwable t);
39+
40+
/**
41+
* Sets a Disposable on this emitter; any previous Disposable
42+
* or Cancellation will be unsubscribed/cancelled.
43+
* @param d the disposable, null is allowed
44+
*/
45+
void setDisposable(Disposable d);
46+
47+
/**
48+
* Sets a Cancellable on this emitter; any previous Disposable
49+
* or Cancellation will be unsubscribed/cancelled.
50+
* @param c the cancellable resource, null is allowed
51+
*/
52+
void setCancellable(Cancellable c);
53+
54+
/**
55+
* Returns true if the downstream cancelled the sequence.
56+
* @return true if the downstream cancelled the sequence
57+
*/
58+
boolean isCancelled();
59+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* A functional interface that has a {@code subscribe()} method that receives
17+
* an instance of a {@link CompletableEmitter} instance that allows pushing
18+
* an event in a cancellation-safe manner.
19+
*/
20+
public interface CompletableOnSubscribe {
21+
22+
/**
23+
* Called for each CompletableObserver that subscribes.
24+
* @param e the safe emitter instance, never null
25+
* @throws Exception on error
26+
*/
27+
void subscribe(CompletableEmitter e) throws Exception;
28+
}
29+

src/main/java/io/reactivex/CompletableSource.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
/**
1616
* Represents a basic {@link Completable} source base interface,
1717
* consumable via an {@link CompletableObserver}.
18-
* <p>
19-
* This class also serves the base type for custom operators wrapped into
20-
* Completable via {@link Completable#create(CompletableSource)}.
2118
*
2219
* @since 2.0
2320
*/

src/main/java/io/reactivex/Flowable.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.reactivex.internal.fuseable.*;
2828
import io.reactivex.internal.operators.completable.CompletableFromPublisher;
2929
import io.reactivex.internal.operators.flowable.*;
30-
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
3130
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
3231
import io.reactivex.internal.operators.single.SingleFromPublisher;
3332
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
@@ -1638,14 +1637,14 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
16381637
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
16391638
* @param mode the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
16401639
* @return the new Flowable instance
1641-
* @see FlowableSource
1640+
* @see FlowableOnSubscribe
16421641
* @see FlowableEmitter.BackpressureMode
16431642
* @see Cancellable
16441643
*/
16451644
@BackpressureSupport(BackpressureKind.SPECIAL)
16461645
@SchedulerSupport(SchedulerSupport.NONE)
1647-
public static <T> Flowable<T> create(FlowableSource<T> source, FlowableEmitter.BackpressureMode mode) {
1648-
return new FlowableFromSource<T>(source, mode);
1646+
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, FlowableEmitter.BackpressureMode mode) {
1647+
return new FlowableCreate<T>(source, mode);
16491648
}
16501649

16511650
/**
@@ -15454,8 +15453,8 @@ public final TestSubscriber<T> test(long initialRequest) { // NoPMD
1545415453
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
1545515454
* Flowable.
1545615455
* @return the new TestSubscriber instance
15457-
* @since 2.0
15458-
*/
15456+
* @since 2.0
15457+
*/
1545915458
public final TestSubscriber<T> test(long initialRequest, int fusionMode, boolean cancelled) { // NoPMD
1546015459
TestSubscriber<T> ts = new TestSubscriber<T>(initialRequest);
1546115460
ts.setInitialFusionMode(fusionMode);

src/main/java/io/reactivex/FlowableEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public interface FlowableEmitter<T> {
5959
* or Cancellation will be unsubscribed/cancelled.
6060
* @param c the cancellable resource, null is allowed
6161
*/
62-
void setCancellation(Cancellable c);
62+
void setCancellable(Cancellable c);
6363
/**
6464
* The current outstanding request amount.
6565
* <p>This method it threadsafe.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* A functional interface that has a {@code subscribe()} method that receives
17+
* an instance of a {@link FlowableEmitter} instance that allows pushing
18+
* events in a backpressure-safe and cancellation-safe manner.
19+
*
20+
* @param <T> the value type pushed
21+
*/
22+
public interface FlowableOnSubscribe<T> {
23+
24+
/**
25+
* Called for each Subscriber that subscribes.
26+
* @param e the safe emitter instance, never null
27+
* @throws Exception on error
28+
*/
29+
void subscribe(FlowableEmitter<T> e) throws Exception;
30+
}
31+

0 commit comments

Comments
 (0)