Skip to content

save allocation in OperatorSkipTimed #4239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9203,7 +9203,7 @@ public final Observable<T> skip(long time, TimeUnit unit) {
* @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
*/
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorSkipTimed<T>(time, unit, scheduler));
return create(new OnSubscribeSkipTimed<T>(this, time, unit, scheduler));
}

/**
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeSkipTimed.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import java.util.concurrent.TimeUnit;

import rx.Observable.OnSubscribe;
import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.functions.Action0;

/**
* Skips elements until a specified time elapses.
* @param <T> the value type
*/
public final class OnSubscribeSkipTimed<T> implements OnSubscribe<T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final Observable<T> source;

public OnSubscribeSkipTimed(Observable<T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public void call(final Subscriber<? super T> child) {
final Worker worker = scheduler.createWorker();
SkipTimedSubscriber<T> subscriber = new SkipTimedSubscriber<T>(child);
subscriber.add(worker);
child.add(subscriber);
worker.schedule(subscriber, time, unit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.add(subscriber) missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah oops, thank you @akarnokd . I just need to add super(child) in the SkipTimedSubscriber constructor. You mentioning that makes me realize that there is probably a backpressure related bug in the existing operator in as much as when we don't emit to the child we should request another (like filter). I'll write a unit test for unsubscription as penance and another to demonstrate the backpressure problem (if it exists).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's wrong because it calls unsubscribe in the onError/onCompleted path and unsubscribes the child. The worker should be added to subscriber instead and subscriber added to the child. No constructor forwarding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't support backpressure:

/**
 * Returns an Observable that skips values emitted by the source Observable before a specified time window
 * elapses.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't support backpressure as it uses time to skip arbitrary number of elements and
 *  thus has to consume the source {@code Observable} in an unbounded manner (i.e., no backpressure applied to it).</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code skip} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that skips values emitted by the source Observable before the time window defined
 *         by {@code time} elapses and the emits the remainder
 * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
 */

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor forwarding handles requests for us too though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would lead to hot spinning for the duration of the time skip.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I'm with you, thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually maybe I'm not with you yet. The alternative is to request Long.MAX_VALUE and it might still spin like crazy in an upstream operator. It just doesn't seem different from filter...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting 1 each time incurs an atomic increment costing 22-45 cycle overhead. For filter, the element count is the main deciding factor. For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For filter, the element count is the main deciding factor

That's assuming that the predicate doesn't use time.

For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.

I'm not sure why we wouldn't accept the atomic increment cost as the price to have this operator support backpressure.

source.unsafeSubscribe(subscriber);
}

final static class SkipTimedSubscriber<T> extends Subscriber<T> implements Action0 {

final Subscriber<? super T> child;
volatile boolean gate;

SkipTimedSubscriber(Subscriber<? super T> child) {
this.child = child;
}

@Override
public void call() {
gate = true;
}

@Override
public void onNext(T t) {
if (gate) {
child.onNext(t);
}
}

@Override
public void onError(Throwable e) {
try {
child.onError(e);
} finally {
unsubscribe();
}
}

@Override
public void onCompleted() {
try {
child.onCompleted();
} finally {
unsubscribe();
}
}

}
}
80 changes: 0 additions & 80 deletions src/main/java/rx/internal/operators/OperatorSkipTimed.java

This file was deleted.

27 changes: 27 additions & 0 deletions src/test/java/rx/internal/operators/OperatorSkipTimedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observer;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -166,4 +170,27 @@ public void testSkipTimedErrorAfterTime() {
verify(o, never()).onCompleted();

}

@Test
public void testSkipTimedUnsubscribePropagatesToUpstream() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();

final AtomicBoolean unsub = new AtomicBoolean();
Observable<Integer> result = source.doOnUnsubscribe(new Action0() {

@Override
public void call() {
unsub.set(true);
}
}).skip(1, TimeUnit.SECONDS, scheduler);

TestSubscriber<Object> ts = TestSubscriber.create();

result.subscribe(ts);
source.onNext(1);
ts.unsubscribe();
assertTrue(unsub.get());
}
}