Skip to content

Commit 66f952a

Browse files
Merge pull request #3371 from akarnokd/NbpObservableOps2x
2.x: non-backpressure NbpObservable all relevant operators + tests.
2 parents f7a5804 + 216082c commit 66f952a

File tree

249 files changed

+58220
-446
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

249 files changed

+58220
-446
lines changed

src/main/java/io/reactivex/NbpObservable.java

Lines changed: 2782 additions & 168 deletions
Large diffs are not rendered by default.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import io.reactivex.NbpObservable.NbpSubscriber;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
19+
20+
public abstract class NbpObserver<T> implements NbpSubscriber<T> {
21+
private Disposable s;
22+
@Override
23+
public final void onSubscribe(Disposable s) {
24+
if (SubscriptionHelper.validateDisposable(this.s, s)) {
25+
return;
26+
}
27+
this.s = s;
28+
onStart();
29+
}
30+
31+
protected final void cancel() {
32+
s.dispose();
33+
}
34+
/**
35+
* Called once the subscription has been set on this observer; override this
36+
* to perform initialization.
37+
*/
38+
protected void onStart() {
39+
}
40+
41+
}

src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.disposables;
1515

16+
import io.reactivex.NbpObservable.NbpSubscriber;
1617
import io.reactivex.disposables.Disposable;
1718

1819
public enum EmptyDisposable implements Disposable {
@@ -23,4 +24,14 @@ public enum EmptyDisposable implements Disposable {
2324
public void dispose() {
2425
// no-op
2526
}
27+
28+
public static void complete(NbpSubscriber<?> s) {
29+
s.onSubscribe(INSTANCE);
30+
s.onComplete();
31+
}
32+
33+
public static void error(Throwable e, NbpSubscriber<?> s) {
34+
s.onSubscribe(INSTANCE);
35+
s.onError(e);
36+
}
2637
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.disposables;
15+
16+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
17+
18+
import io.reactivex.NbpObservable.NbpSubscriber;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
21+
import io.reactivex.internal.util.NotificationLite;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
24+
/**
25+
* Performs full arbitration of Subscriber events with strict drain (i.e., old emissions of another
26+
* subscriber are dropped).
27+
*
28+
* @param <T> the value type
29+
*/
30+
public final class NbpFullArbiter<T> extends FullArbiterPad1 implements Disposable {
31+
final NbpSubscriber<? super T> actual;
32+
final SpscLinkedArrayQueue<Object> queue;
33+
34+
volatile Disposable s;
35+
static final Disposable INITIAL = () -> { };
36+
37+
38+
Disposable resource;
39+
40+
volatile boolean cancelled;
41+
42+
public NbpFullArbiter(NbpSubscriber<? super T> actual, Disposable resource, int capacity) {
43+
this.actual = actual;
44+
this.resource = resource;
45+
this.queue = new SpscLinkedArrayQueue<>(capacity);
46+
this.s = INITIAL;
47+
}
48+
49+
@Override
50+
public void dispose() {
51+
if (!cancelled) {
52+
cancelled = true;
53+
disposeResource();
54+
}
55+
}
56+
57+
void disposeResource() {
58+
Disposable d = resource;
59+
resource = null;
60+
if (d != null) {
61+
d.dispose();
62+
}
63+
}
64+
65+
public boolean setSubscription(Disposable s) {
66+
if (cancelled) {
67+
return false;
68+
}
69+
70+
queue.offer(this.s, NotificationLite.disposable(s));
71+
drain();
72+
return true;
73+
}
74+
75+
public boolean onNext(T value, Disposable s) {
76+
if (cancelled) {
77+
return false;
78+
}
79+
80+
queue.offer(s, NotificationLite.next(value));
81+
drain();
82+
return true;
83+
}
84+
85+
public void onError(Throwable value, Disposable s) {
86+
if (cancelled) {
87+
RxJavaPlugins.onError(value);
88+
return;
89+
}
90+
queue.offer(s, NotificationLite.error(value));
91+
drain();
92+
}
93+
94+
public void onComplete(Disposable s) {
95+
queue.offer(s, NotificationLite.complete());
96+
drain();
97+
}
98+
99+
void drain() {
100+
if (WIP.getAndIncrement(this) != 0) {
101+
return;
102+
}
103+
104+
int missed = 1;
105+
106+
final SpscLinkedArrayQueue<Object> q = queue;
107+
final NbpSubscriber<? super T> a = actual;
108+
109+
for (;;) {
110+
111+
for (;;) {
112+
Object o = q.peek();
113+
114+
if (o == null) {
115+
break;
116+
}
117+
118+
q.poll();
119+
Object v = q.poll();
120+
121+
if (o != s) {
122+
continue;
123+
} else
124+
if (NotificationLite.isDisposable(v)) {
125+
Disposable next = NotificationLite.getDisposable(v);
126+
if (s != null) {
127+
s.dispose();
128+
}
129+
s = next;
130+
} else
131+
if (NotificationLite.isError(v)) {
132+
q.clear();
133+
disposeResource();
134+
135+
Throwable ex = NotificationLite.getError(v);
136+
if (!cancelled) {
137+
cancelled = true;
138+
a.onError(ex);
139+
} else {
140+
RxJavaPlugins.onError(ex);
141+
}
142+
} else
143+
if (NotificationLite.isComplete(v)) {
144+
q.clear();
145+
disposeResource();
146+
147+
if (!cancelled) {
148+
cancelled = true;
149+
a.onComplete();
150+
}
151+
} else {
152+
a.onNext(NotificationLite.getValue(v));
153+
}
154+
}
155+
156+
missed = WIP.addAndGet(this, -missed);
157+
if (missed == 0) {
158+
break;
159+
}
160+
}
161+
}
162+
}
163+
164+
/** Pads the object header away. */
165+
class FullArbiterPad0 {
166+
volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a;
167+
volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a;
168+
}
169+
170+
/** The work-in-progress counter. */
171+
class FullArbiterWip extends FullArbiterPad0 {
172+
volatile int wip;
173+
static final AtomicIntegerFieldUpdater<FullArbiterWip> WIP =
174+
AtomicIntegerFieldUpdater.newUpdater(FullArbiterWip.class, "wip");
175+
}
176+
177+
/** Pads the wip counter away. */
178+
class FullArbiterPad1 extends FullArbiterWip {
179+
volatile long p1b, p2b, p3b, p4b, p5b, p6b, p7b;
180+
volatile long p8b, p9b, p10b, p11b, p12b, p13b, p14b, p15b;
181+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.nbp;
15+
16+
import java.util.*;
17+
import java.util.concurrent.Semaphore;
18+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.internal.subscribers.nbp.NbpDisposableSubscriber;
22+
import io.reactivex.internal.util.Exceptions;
23+
24+
/**
25+
* Wait for and iterate over the latest values of the source observable. If the source works faster than the
26+
* iterator, values may be skipped, but not the {@code onError} or {@code onCompleted} events.
27+
*/
28+
public enum NbpBlockingOperatorLatest {
29+
;
30+
31+
/**
32+
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
33+
* been returned by the {@code Iterable}, then returns that item
34+
*
35+
* @param source
36+
* the source {@code Observable}
37+
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
38+
* been returned by the {@code Iterable}, then returns that item
39+
*/
40+
public static <T> Iterable<T> latest(final NbpObservable<? extends T> source) {
41+
return new Iterable<T>() {
42+
@Override
43+
public Iterator<T> iterator() {
44+
NbpLatestObserverIterator<T> lio = new NbpLatestObserverIterator<>();
45+
46+
@SuppressWarnings("unchecked")
47+
NbpObservable<Try<Optional<T>>> materialized = ((NbpObservable<T>)source).materialize();
48+
49+
materialized.subscribe(lio);
50+
return lio;
51+
}
52+
};
53+
}
54+
55+
/** Observer of source, iterator for output. */
56+
static final class NbpLatestObserverIterator<T> extends NbpDisposableSubscriber<Try<Optional<T>>> implements Iterator<T> {
57+
final Semaphore notify = new Semaphore(0);
58+
// observer's notification
59+
volatile Try<Optional<T>> value;
60+
/** Updater for the value field. */
61+
@SuppressWarnings("rawtypes")
62+
static final AtomicReferenceFieldUpdater<NbpLatestObserverIterator, Try> REFERENCE_UPDATER
63+
= AtomicReferenceFieldUpdater.newUpdater(NbpLatestObserverIterator.class, Try.class, "value");
64+
65+
@Override
66+
public void onNext(Try<Optional<T>> args) {
67+
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
68+
if (wasntAvailable) {
69+
notify.release();
70+
}
71+
}
72+
73+
@Override
74+
public void onError(Throwable e) {
75+
// not expected
76+
}
77+
78+
@Override
79+
public void onComplete() {
80+
// not expected
81+
}
82+
83+
// iterator's notification
84+
Try<Optional<T>> iNotif;
85+
86+
@Override
87+
public boolean hasNext() {
88+
if (iNotif != null && iNotif.hasError()) {
89+
throw Exceptions.propagate(iNotif.error());
90+
}
91+
if (iNotif == null || iNotif.value().isPresent()) {
92+
if (iNotif == null) {
93+
try {
94+
notify.acquire();
95+
} catch (InterruptedException ex) {
96+
dispose();
97+
Thread.currentThread().interrupt();
98+
iNotif = Notification.error(ex);
99+
throw Exceptions.propagate(ex);
100+
}
101+
102+
@SuppressWarnings("unchecked")
103+
Try<Optional<T>> n = REFERENCE_UPDATER.getAndSet(this, null);
104+
iNotif = n;
105+
if (iNotif.hasError()) {
106+
throw Exceptions.propagate(iNotif.error());
107+
}
108+
}
109+
}
110+
return iNotif.value().isPresent();
111+
}
112+
113+
@Override
114+
public T next() {
115+
if (hasNext()) {
116+
if (iNotif.value().isPresent()) {
117+
T v = iNotif.value().get();
118+
iNotif = null;
119+
return v;
120+
}
121+
}
122+
throw new NoSuchElementException();
123+
}
124+
125+
@Override
126+
public void remove() {
127+
throw new UnsupportedOperationException("Read-only iterator.");
128+
}
129+
130+
}
131+
}

0 commit comments

Comments
 (0)