Skip to content

Commit 41cfbf6

Browse files
mostroverkhovakarnokd
authored andcommitted
2.x UnicastProcessor fail-fast support (#5226)
* [2.x] UnicastProcessor fail-fast support * follow-up: remove constructor, add onTerminate non-null checks to factory methods, fixed typo
1 parent 0a07ac1 commit 41cfbf6

File tree

2 files changed

+130
-20
lines changed

2 files changed

+130
-20
lines changed

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 82 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.reactivex.annotations.CheckReturnValue;
1717
import java.util.concurrent.atomic.*;
1818

19+
import io.reactivex.annotations.Experimental;
1920
import io.reactivex.annotations.Nullable;
2021
import org.reactivestreams.*;
2122

@@ -49,7 +50,10 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {
4950

5051
final AtomicReference<Runnable> onTerminate;
5152

53+
final boolean delayError;
54+
5255
volatile boolean done;
56+
5357
Throwable error;
5458

5559
final AtomicReference<Subscriber<? super T>> actual;
@@ -85,6 +89,19 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
8589
return new UnicastProcessor<T>(capacityHint);
8690
}
8791

92+
/**
93+
* Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.
94+
* @param <T> the value type
95+
* @param delayError deliver pending onNext events before onError
96+
* @return an UnicastProcessor instance
97+
* @since 2.0.8 - experimental
98+
*/
99+
@CheckReturnValue
100+
@Experimental
101+
public static <T> UnicastProcessor<T> create(boolean delayError) {
102+
return new UnicastProcessor<T>(bufferSize(), null, delayError);
103+
}
104+
88105
/**
89106
* Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for
90107
* the case when the single Subscriber cancels its subscription.
@@ -99,21 +116,38 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
99116
*/
100117
@CheckReturnValue
101118
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
119+
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
102120
return new UnicastProcessor<T>(capacityHint, onCancelled);
103121
}
104122

123+
/**
124+
* Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for
125+
* the case when the single Subscriber cancels its subscription.
126+
*
127+
* <p>The callback, if not null, is called exactly once and
128+
* non-overlapped with any active replay.
129+
*
130+
* @param <T> the value type
131+
* @param capacityHint the hint to size the internal unbounded buffer
132+
* @param onCancelled the non null callback
133+
* @param delayError deliver pending onNext events before onError
134+
* @return an UnicastProcessor instance
135+
* @since 2.0.8 - experimental
136+
*/
137+
@CheckReturnValue
138+
@Experimental
139+
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled, boolean delayError) {
140+
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
141+
return new UnicastProcessor<T>(capacityHint, onCancelled, delayError);
142+
}
143+
105144
/**
106145
* Creates an UnicastProcessor with the given capacity hint.
107146
* @param capacityHint the capacity hint for the internal, unbounded queue
108147
* @since 2.0
109148
*/
110149
UnicastProcessor(int capacityHint) {
111-
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
112-
this.onTerminate = new AtomicReference<Runnable>();
113-
this.actual = new AtomicReference<Subscriber<? super T>>();
114-
this.once = new AtomicBoolean();
115-
this.wip = new UnicastQueueSubscription();
116-
this.requested = new AtomicLong();
150+
this(capacityHint,null, true);
117151
}
118152

119153
/**
@@ -124,8 +158,21 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
124158
* @since 2.0
125159
*/
126160
UnicastProcessor(int capacityHint, Runnable onTerminate) {
161+
this(capacityHint, onTerminate, true);
162+
}
163+
164+
/**
165+
* Creates an UnicastProcessor with the given capacity hint and callback
166+
* for when the Processor is terminated normally or its single Subscriber cancels.
167+
* @param capacityHint the capacity hint for the internal, unbounded queue
168+
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed
169+
* @param delayError deliver pending onNext events before onError
170+
* @since 2.0.8 - experimental
171+
*/
172+
UnicastProcessor(int capacityHint, Runnable onTerminate, boolean delayError) {
127173
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
128-
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
174+
this.onTerminate = new AtomicReference<Runnable>(onTerminate);
175+
this.delayError = delayError;
129176
this.actual = new AtomicReference<Subscriber<? super T>>();
130177
this.once = new AtomicBoolean();
131178
this.wip = new UnicastQueueSubscription();
@@ -143,7 +190,7 @@ void drainRegular(Subscriber<? super T> a) {
143190
int missed = 1;
144191

145192
final SpscLinkedArrayQueue<T> q = queue;
146-
193+
final boolean failFast = !delayError;
147194
for (;;) {
148195

149196
long r = requested.get();
@@ -155,7 +202,7 @@ void drainRegular(Subscriber<? super T> a) {
155202
T t = q.poll();
156203
boolean empty = t == null;
157204

158-
if (checkTerminated(d, empty, a, q)) {
205+
if (checkTerminated(failFast, d, empty, a, q)) {
159206
return;
160207
}
161208

@@ -168,7 +215,7 @@ void drainRegular(Subscriber<? super T> a) {
168215
e++;
169216
}
170217

171-
if (r == e && checkTerminated(done, q.isEmpty(), a, q)) {
218+
if (r == e && checkTerminated(failFast, done, q.isEmpty(), a, q)) {
172219
return;
173220
}
174221

@@ -187,7 +234,7 @@ void drainFused(Subscriber<? super T> a) {
187234
int missed = 1;
188235

189236
final SpscLinkedArrayQueue<T> q = queue;
190-
237+
final boolean failFast = !delayError;
191238
for (;;) {
192239

193240
if (cancelled) {
@@ -198,6 +245,12 @@ void drainFused(Subscriber<? super T> a) {
198245

199246
boolean d = done;
200247

248+
if (failFast && d && error != null) {
249+
q.clear();
250+
actual.lazySet(null);
251+
a.onError(error);
252+
return;
253+
}
201254
a.onNext(null);
202255

203256
if (d) {
@@ -246,21 +299,30 @@ void drain() {
246299
}
247300
}
248301

249-
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
302+
boolean checkTerminated(boolean failFast, boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
250303
if (cancelled) {
251304
q.clear();
252305
actual.lazySet(null);
253306
return true;
254307
}
255-
if (d && empty) {
256-
Throwable e = error;
257-
actual.lazySet(null);
258-
if (e != null) {
259-
a.onError(e);
260-
} else {
261-
a.onComplete();
308+
309+
if (d) {
310+
if (failFast && error != null) {
311+
q.clear();
312+
actual.lazySet(null);
313+
a.onError(error);
314+
return true;
315+
}
316+
if (empty) {
317+
Throwable e = error;
318+
actual.lazySet(null);
319+
if (e != null) {
320+
a.onError(e);
321+
} else {
322+
a.onComplete();
323+
}
324+
return true;
262325
}
263-
return true;
264326
}
265327

266328
return false;

src/test/java/io/reactivex/processors/UnicastProcessorTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,54 @@ public void fusionOfflie() {
7676
.assertResult(1);
7777
}
7878

79+
@Test
80+
public void failFast() {
81+
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
82+
ap.onNext(1);
83+
ap.onError(new RuntimeException());
84+
85+
TestSubscriber<Integer> ts = TestSubscriber.create();
86+
87+
ap.subscribe(ts);
88+
89+
ts
90+
.assertValueCount(0)
91+
.assertError(RuntimeException.class);
92+
}
93+
94+
@Test
95+
public void failFastFusionOffline() {
96+
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
97+
ap.onNext(1);
98+
ap.onError(new RuntimeException());
99+
100+
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueSubscription.ANY);
101+
102+
ap.subscribe(ts);
103+
ts
104+
.assertValueCount(0)
105+
.assertError(RuntimeException.class);
106+
}
107+
108+
@Test
109+
public void threeArgsFactory() {
110+
Runnable noop = new Runnable() {
111+
@Override
112+
public void run() {
113+
}
114+
};
115+
UnicastProcessor<Integer> ap = UnicastProcessor.create(16, noop,false);
116+
ap.onNext(1);
117+
ap.onError(new RuntimeException());
118+
119+
TestSubscriber<Integer> ts = TestSubscriber.create();
120+
121+
ap.subscribe(ts);
122+
ts
123+
.assertValueCount(0)
124+
.assertError(RuntimeException.class);
125+
}
126+
79127
@Test
80128
public void onTerminateCalledWhenOnError() {
81129
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();

0 commit comments

Comments
 (0)