Skip to content

2.x: non-backpressure NbpObservable all relevant operators + tests. #3371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,950 changes: 2,782 additions & 168 deletions src/main/java/io/reactivex/NbpObservable.java

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions src/main/java/io/reactivex/NbpObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

public abstract class NbpObserver<T> implements NbpSubscriber<T> {
private Disposable s;
@Override
public final void onSubscribe(Disposable s) {
if (SubscriptionHelper.validateDisposable(this.s, s)) {
return;
}
this.s = s;
onStart();
}

protected final void cancel() {
s.dispose();
}
/**
* Called once the subscription has been set on this observer; override this
* to perform initialization.
*/
protected void onStart() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.disposables;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;

public enum EmptyDisposable implements Disposable {
Expand All @@ -23,4 +24,14 @@ public enum EmptyDisposable implements Disposable {
public void dispose() {
// no-op
}

public static void complete(NbpSubscriber<?> s) {
s.onSubscribe(INSTANCE);
s.onComplete();
}

public static void error(Throwable e, NbpSubscriber<?> s) {
s.onSubscribe(INSTANCE);
s.onError(e);
}
}
181 changes: 181 additions & 0 deletions src/main/java/io/reactivex/internal/disposables/NbpFullArbiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import io.reactivex.NbpObservable.NbpSubscriber;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Performs full arbitration of Subscriber events with strict drain (i.e., old emissions of another
* subscriber are dropped).
*
* @param <T> the value type
*/
public final class NbpFullArbiter<T> extends FullArbiterPad1 implements Disposable {
final NbpSubscriber<? super T> actual;
final SpscLinkedArrayQueue<Object> queue;

volatile Disposable s;
static final Disposable INITIAL = () -> { };


Disposable resource;

volatile boolean cancelled;

public NbpFullArbiter(NbpSubscriber<? super T> actual, Disposable resource, int capacity) {
this.actual = actual;
this.resource = resource;
this.queue = new SpscLinkedArrayQueue<>(capacity);
this.s = INITIAL;
}

@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
disposeResource();
}
}

void disposeResource() {
Disposable d = resource;
resource = null;
if (d != null) {
d.dispose();
}
}

public boolean setSubscription(Disposable s) {
if (cancelled) {
return false;
}

queue.offer(this.s, NotificationLite.disposable(s));
drain();
return true;
}

public boolean onNext(T value, Disposable s) {
if (cancelled) {
return false;
}

queue.offer(s, NotificationLite.next(value));
drain();
return true;
}

public void onError(Throwable value, Disposable s) {
if (cancelled) {
RxJavaPlugins.onError(value);
return;
}
queue.offer(s, NotificationLite.error(value));
drain();
}

public void onComplete(Disposable s) {
queue.offer(s, NotificationLite.complete());
drain();
}

void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}

int missed = 1;

final SpscLinkedArrayQueue<Object> q = queue;
final NbpSubscriber<? super T> a = actual;

for (;;) {

for (;;) {
Object o = q.peek();

if (o == null) {
break;
}

q.poll();
Object v = q.poll();

if (o != s) {
continue;
} else
if (NotificationLite.isDisposable(v)) {
Disposable next = NotificationLite.getDisposable(v);
if (s != null) {
s.dispose();
}
s = next;
} else
if (NotificationLite.isError(v)) {
q.clear();
disposeResource();

Throwable ex = NotificationLite.getError(v);
if (!cancelled) {
cancelled = true;
a.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
} else
if (NotificationLite.isComplete(v)) {
q.clear();
disposeResource();

if (!cancelled) {
cancelled = true;
a.onComplete();
}
} else {
a.onNext(NotificationLite.getValue(v));
}
}

missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
break;
}
}
}
}

/** Pads the object header away. */
class FullArbiterPad0 {
volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a;
volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a;
}

/** The work-in-progress counter. */
class FullArbiterWip extends FullArbiterPad0 {
volatile int wip;
static final AtomicIntegerFieldUpdater<FullArbiterWip> WIP =
AtomicIntegerFieldUpdater.newUpdater(FullArbiterWip.class, "wip");
}

/** Pads the wip counter away. */
class FullArbiterPad1 extends FullArbiterWip {
volatile long p1b, p2b, p3b, p4b, p5b, p6b, p7b;
volatile long p8b, p9b, p10b, p11b, p12b, p13b, p14b, p15b;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.nbp;

import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import io.reactivex.*;
import io.reactivex.internal.subscribers.nbp.NbpDisposableSubscriber;
import io.reactivex.internal.util.Exceptions;

/**
* Wait for and iterate over the latest values of the source observable. If the source works faster than the
* iterator, values may be skipped, but not the {@code onError} or {@code onCompleted} events.
*/
public enum NbpBlockingOperatorLatest {
;

/**
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item
*
* @param source
* the source {@code Observable}
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item
*/
public static <T> Iterable<T> latest(final NbpObservable<? extends T> source) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
NbpLatestObserverIterator<T> lio = new NbpLatestObserverIterator<>();

@SuppressWarnings("unchecked")
NbpObservable<Try<Optional<T>>> materialized = ((NbpObservable<T>)source).materialize();

materialized.subscribe(lio);
return lio;
}
};
}

/** Observer of source, iterator for output. */
static final class NbpLatestObserverIterator<T> extends NbpDisposableSubscriber<Try<Optional<T>>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
volatile Try<Optional<T>> value;
/** Updater for the value field. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<NbpLatestObserverIterator, Try> REFERENCE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(NbpLatestObserverIterator.class, Try.class, "value");

@Override
public void onNext(Try<Optional<T>> args) {
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
if (wasntAvailable) {
notify.release();
}
}

@Override
public void onError(Throwable e) {
// not expected
}

@Override
public void onComplete() {
// not expected
}

// iterator's notification
Try<Optional<T>> iNotif;

@Override
public boolean hasNext() {
if (iNotif != null && iNotif.hasError()) {
throw Exceptions.propagate(iNotif.error());
}
if (iNotif == null || iNotif.value().isPresent()) {
if (iNotif == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
dispose();
Thread.currentThread().interrupt();
iNotif = Notification.error(ex);
throw Exceptions.propagate(ex);
}

@SuppressWarnings("unchecked")
Try<Optional<T>> n = REFERENCE_UPDATER.getAndSet(this, null);
iNotif = n;
if (iNotif.hasError()) {
throw Exceptions.propagate(iNotif.error());
}
}
}
return iNotif.value().isPresent();
}

@Override
public T next() {
if (hasNext()) {
if (iNotif.value().isPresent()) {
T v = iNotif.value().get();
iNotif = null;
return v;
}
}
throw new NoSuchElementException();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read-only iterator.");
}

}
}
Loading