Skip to content

Remove tabs indent and redundant space. #4343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 11, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 89 additions & 89 deletions src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -45,8 +45,8 @@ public abstract class Scheduler {
* maintenance.
*/

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
*/
Expand All @@ -55,14 +55,14 @@ public abstract class Scheduler {
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx.scheduler.drift-tolerance", 15));
}

/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
* <p>
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
*
*
* @return a Worker representing a serial queue of actions to be executed
*/
public abstract Worker createWorker();
Expand All @@ -76,7 +76,7 @@ public abstract static class Worker implements Subscription {

/**
* Schedules an Action for execution.
*
*
* @param action
* Action to schedule
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
Expand Down Expand Up @@ -107,7 +107,7 @@ public abstract static class Worker implements Subscription {
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* undelayed scheduling of the first and any subsequent executions.
*
*
* @param action
* the Action to execute periodically
* @param initialDelay
Expand All @@ -127,7 +127,7 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay

final SequentialSubscription first = new SequentialSubscription();
final SequentialSubscription mas = new SequentialSubscription(first);

final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
Expand All @@ -137,9 +137,9 @@ public void call() {
action.call();

if (!mas.isUnsubscribed()) {

long nextTick;

long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
Expand All @@ -154,7 +154,7 @@ public void call() {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;

long delay = nextTick - nowNanos;
mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
Expand Down Expand Up @@ -183,82 +183,82 @@ public long now() {
return System.currentTimeMillis();
}

/**
* Allows the use of operators for controlling the timing around when
* actions scheduled on workers are actually done. This makes it possible to
* layer additional behavior on this {@link Scheduler}. The only parameter
* is a function that flattens an {@link Observable} of {@link Observable}
* of {@link Completable}s into just one {@link Completable}. There must be
* a chain of operators connecting the returned value to the source
* {@link Observable} otherwise any work scheduled on the returned
* {@link Scheduler} will not be executed.
* <p>
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
* {@link Completable}s is onNext'd to the combinator to be flattened. If
* the inner {@link Observable} is not immediately subscribed to an calls to
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
* subscribed to actions are then onNext'd as {@link Completable}s.
* <p>
* Finally the actions scheduled on the parent {@link Scheduler} when the
* inner most {@link Completable}s are subscribed to.
* <p>
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
* onComplete and triggers any behavior in the flattening operator. The
* {@link Observable} and all {@link Completable}s give to the flattening
* function never onError.
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Observable.merge(workers), 2);
* });
* </pre>
* <p>
* This is a slightly different way to limit the concurrency but it has some
* interesting benefits and drawbacks to the method above. It works by
* limited the number of concurrent {@link Worker}s rather than individual
* actions. Generally each {@link Observable} uses its own {@link Worker}.
* This means that this will essentially limit the number of concurrent
* subscribes. The danger comes from using operators like
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
* subscribing to the first {@link Observable} could deadlock the
* subscription to the second.
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Observables two at a time
* return Completable.merge(Observable.merge(workers, 2));
* });
* </pre>
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Observable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
* <pre>
* Scheduler slowSched = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
* </pre>
*
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Observable sequence of a Completable and returns
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
* @return the Scheduler with the customized execution behavior
*/
/**
* Allows the use of operators for controlling the timing around when
* actions scheduled on workers are actually done. This makes it possible to
* layer additional behavior on this {@link Scheduler}. The only parameter
* is a function that flattens an {@link Observable} of {@link Observable}
* of {@link Completable}s into just one {@link Completable}. There must be
* a chain of operators connecting the returned value to the source
* {@link Observable} otherwise any work scheduled on the returned
* {@link Scheduler} will not be executed.
* <p>
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
* {@link Completable}s is onNext'd to the combinator to be flattened. If
* the inner {@link Observable} is not immediately subscribed to an calls to
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
* subscribed to actions are then onNext'd as {@link Completable}s.
* <p>
* Finally the actions scheduled on the parent {@link Scheduler} when the
* inner most {@link Completable}s are subscribed to.
* <p>
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
* onComplete and triggers any behavior in the flattening operator. The
* {@link Observable} and all {@link Completable}s give to the flattening
* function never onError.
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Observable.merge(workers), 2);
* });
* </pre>
* <p>
* This is a slightly different way to limit the concurrency but it has some
* interesting benefits and drawbacks to the method above. It works by
* limited the number of concurrent {@link Worker}s rather than individual
* actions. Generally each {@link Observable} uses its own {@link Worker}.
* This means that this will essentially limit the number of concurrent
* subscribes. The danger comes from using operators like
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
* subscribing to the first {@link Observable} could deadlock the
* subscription to the second.
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Observables two at a time
* return Completable.merge(Observable.merge(workers, 2));
* });
* </pre>
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Observable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
* <pre>
* Scheduler slowSched = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
* </pre>
*
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Observable sequence of a Completable and returns
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
* @return the Scheduler with the customized execution behavior
*/
@SuppressWarnings("unchecked")
@Experimental
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
}