Skip to content

Commit d832fb1

Browse files
committed
3.x: Add fusion support to concatMap{Maybe|Single|Completable}
1 parent e657635 commit d832fb1

16 files changed

+856
-491
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.mixed;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import io.reactivex.rxjava3.core.Observer;
19+
import io.reactivex.rxjava3.disposables.Disposable;
20+
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
21+
import io.reactivex.rxjava3.internal.fuseable.*;
22+
import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue;
23+
import io.reactivex.rxjava3.internal.util.*;
24+
25+
/**
26+
* Base class for implementing concatMapX main observers.
27+
*
28+
* @param <T> the upstream value type
29+
* @since 3.0.10
30+
*/
31+
public abstract class ConcatMapXMainObserver<T> extends AtomicInteger
32+
implements Observer<T>, Disposable {
33+
34+
private static final long serialVersionUID = -3214213361171757852L;
35+
36+
final AtomicThrowable errors;
37+
38+
final int prefetch;
39+
40+
final ErrorMode errorMode;
41+
42+
SimpleQueue<T> queue;
43+
44+
Disposable upstream;
45+
46+
volatile boolean done;
47+
48+
volatile boolean disposed;
49+
50+
public ConcatMapXMainObserver(int prefetch, ErrorMode errorMode) {
51+
this.errorMode = errorMode;
52+
this.errors = new AtomicThrowable();
53+
this.prefetch = prefetch;
54+
}
55+
56+
@Override
57+
public final void onSubscribe(Disposable d) {
58+
if (DisposableHelper.validate(upstream, d)) {
59+
upstream = d;
60+
if (d instanceof QueueDisposable) {
61+
@SuppressWarnings("unchecked")
62+
QueueDisposable<T> qd = (QueueDisposable<T>)d;
63+
int mode = qd.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY);
64+
if (mode == QueueFuseable.SYNC) {
65+
queue = qd;
66+
done = true;
67+
68+
onSubscribeDownstream();
69+
70+
drain();
71+
return;
72+
}
73+
else if (mode == QueueFuseable.ASYNC) {
74+
queue = qd;
75+
76+
onSubscribeDownstream();
77+
78+
return;
79+
}
80+
}
81+
82+
queue = new SpscLinkedArrayQueue<>(prefetch);
83+
onSubscribeDownstream();
84+
}
85+
}
86+
87+
@Override
88+
public final void onNext(T t) {
89+
// In async fusion mode, t is a drain indicator
90+
if (t != null) {
91+
queue.offer(t);
92+
}
93+
drain();
94+
}
95+
96+
@Override
97+
public final void onError(Throwable t) {
98+
if (errors.tryAddThrowableOrReport(t)) {
99+
if (errorMode == ErrorMode.IMMEDIATE) {
100+
disposeInner();
101+
}
102+
done = true;
103+
drain();
104+
}
105+
}
106+
107+
@Override
108+
public final void onComplete() {
109+
done = true;
110+
drain();
111+
}
112+
113+
@Override
114+
public final void dispose() {
115+
disposed = true;
116+
upstream.dispose();
117+
disposeInner();
118+
errors.tryTerminateAndReport();
119+
if (getAndIncrement() == 0) {
120+
queue.clear();
121+
clearValue();
122+
}
123+
}
124+
125+
@Override
126+
public final boolean isDisposed() {
127+
return disposed;
128+
}
129+
130+
/**
131+
* Override this to clear values when the downstream disposes.
132+
*/
133+
void clearValue() {
134+
}
135+
136+
/**
137+
* Typically, this should be {@code downstream.onSubscribe(this)}.
138+
*/
139+
abstract void onSubscribeDownstream();
140+
141+
/**
142+
* Typically, this should be {@code inner.dispose()}.
143+
*/
144+
abstract void disposeInner();
145+
146+
/**
147+
* Implement the serialized inner subscribing and value emission here.
148+
*/
149+
abstract void drain();
150+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.mixed;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import org.reactivestreams.Subscription;
19+
20+
import io.reactivex.rxjava3.core.FlowableSubscriber;
21+
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
22+
import io.reactivex.rxjava3.internal.fuseable.*;
23+
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
24+
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
25+
import io.reactivex.rxjava3.internal.util.*;
26+
27+
/**
28+
* Base class for implementing concatMapX main subscribers.
29+
*
30+
* @param <T> the upstream value type
31+
* @since 3.0.10
32+
*/
33+
public abstract class ConcatMapXMainSubscriber<T> extends AtomicInteger
34+
implements FlowableSubscriber<T> {
35+
36+
private static final long serialVersionUID = -3214213361171757852L;
37+
38+
final AtomicThrowable errors;
39+
40+
final int prefetch;
41+
42+
final ErrorMode errorMode;
43+
44+
SimpleQueue<T> queue;
45+
46+
Subscription upstream;
47+
48+
volatile boolean done;
49+
50+
volatile boolean cancelled;
51+
52+
boolean syncFused;
53+
54+
public ConcatMapXMainSubscriber(int prefetch, ErrorMode errorMode) {
55+
this.errorMode = errorMode;
56+
this.errors = new AtomicThrowable();
57+
this.prefetch = prefetch;
58+
}
59+
60+
@Override
61+
public final void onSubscribe(Subscription s) {
62+
if (SubscriptionHelper.validate(upstream, s)) {
63+
upstream = s;
64+
if (s instanceof QueueSubscription) {
65+
@SuppressWarnings("unchecked")
66+
QueueSubscription<T> qs = (QueueSubscription<T>)s;
67+
int mode = qs.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY);
68+
if (mode == QueueFuseable.SYNC) {
69+
queue = qs;
70+
syncFused = true;
71+
done = true;
72+
73+
onSubscribeDownstream();
74+
75+
drain();
76+
return;
77+
}
78+
else if (mode == QueueFuseable.ASYNC) {
79+
queue = qs;
80+
81+
onSubscribeDownstream();
82+
83+
upstream.request(prefetch);
84+
return;
85+
}
86+
}
87+
88+
queue = new SpscArrayQueue<>(prefetch);
89+
onSubscribeDownstream();
90+
upstream.request(prefetch);
91+
}
92+
}
93+
94+
@Override
95+
public final void onNext(T t) {
96+
// In async fusion mode, t is a drain indicator
97+
if (t != null) {
98+
if (!queue.offer(t)) {
99+
upstream.cancel();
100+
onError(new MissingBackpressureException("queue full?!"));
101+
return;
102+
}
103+
}
104+
drain();
105+
}
106+
107+
@Override
108+
public final void onError(Throwable t) {
109+
if (errors.tryAddThrowableOrReport(t)) {
110+
if (errorMode == ErrorMode.IMMEDIATE) {
111+
disposeInner();
112+
}
113+
done = true;
114+
drain();
115+
}
116+
}
117+
118+
@Override
119+
public final void onComplete() {
120+
done = true;
121+
drain();
122+
}
123+
124+
final void stop() {
125+
cancelled = true;
126+
upstream.cancel();
127+
disposeInner();
128+
errors.tryTerminateAndReport();
129+
if (getAndIncrement() == 0) {
130+
queue.clear();
131+
clearValue();
132+
}
133+
}
134+
135+
/**
136+
* Override this to clear values when the downstream disposes.
137+
*/
138+
void clearValue() {
139+
}
140+
141+
/**
142+
* Typically, this should be {@code downstream.onSubscribe(this);}.
143+
*/
144+
abstract void onSubscribeDownstream();
145+
146+
/**
147+
* Typically, this should be {@code inner.dispose()}.
148+
*/
149+
abstract void disposeInner();
150+
151+
/**
152+
* Implement the serialized inner subscribing and value emission here.
153+
*/
154+
abstract void drain();
155+
}

0 commit comments

Comments
 (0)