-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Lost Subscriptions #2576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lost Subscriptions #2576
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -286,6 +286,7 @@ public void setProducer(Producer producer) { | |
})); | ||
|
||
// subscribe to the restarts observable to know when to schedule the next redo. | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L199: the worker is added to the child. |
||
worker.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
|
@@ -304,6 +305,7 @@ public void onError(Throwable e) { | |
public void onNext(Object t) { | ||
if (!isLocked.get() && !child.isUnsubscribed()) { | ||
if (consumerCapacity.get() > 0) { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L199: the worker is added to the child. |
||
worker.schedule(subscribeToSource); | ||
} else { | ||
resumeBoundary.compareAndSet(false, true); | ||
|
@@ -329,6 +331,7 @@ public void request(final long n) { | |
producer.request(n); | ||
} else | ||
if (c == 0 && resumeBoundary.compareAndSet(true, false)) { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L199: the worker is added to the child. |
||
worker.schedule(subscribeToSource); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ public OnSubscribeTimerOnce(long time, TimeUnit unit, Scheduler scheduler) { | |
public void call(final Subscriber<? super Long> child) { | ||
Worker worker = scheduler.createWorker(); | ||
child.add(worker); | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L43: the worker is added to the child. |
||
worker.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit uni | |
public void call(final Subscriber<? super Long> child) { | ||
final Worker worker = scheduler.createWorker(); | ||
child.add(worker); | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L45: the worker is added to the child. |
||
worker.schedulePeriodically(new Action0() { | ||
long counter; | ||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,6 +166,7 @@ public void onCompleted() { | |
unsubscribe(); | ||
} | ||
void scheduleChunk() { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L79: worker is added to subscriber. |
||
inner.schedulePeriodically(new Action0() { | ||
@Override | ||
public void call() { | ||
|
@@ -181,6 +182,7 @@ void startNewChunk() { | |
} | ||
chunks.add(chunk); | ||
} | ||
// FIXME should subscription returned be added to the child composite | ||
inner.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
|
@@ -280,6 +282,7 @@ public void onCompleted() { | |
unsubscribe(); | ||
} | ||
void scheduleExact() { | ||
// FIXME should subscription returned be added to the child composite | ||
inner.schedulePeriodically(new Action0() { | ||
@Override | ||
public void call() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) { | |
|
||
@Override | ||
public void onCompleted() { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L50: worker is added to the child. |
||
worker.schedule(new Action0() { | ||
|
||
@Override | ||
|
@@ -69,6 +70,7 @@ public void onError(Throwable e) { | |
|
||
@Override | ||
public void onNext(final T t) { | ||
// FIXME should subscription returned be added to the child composite | ||
worker.schedule(new Action0() { | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,7 @@ public void onError(final Throwable e) { | |
|
||
protected void schedule() { | ||
if (COUNTER_UPDATER.getAndIncrement(this) == 0) { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L101: recursiveScheduler is added to the child |
||
recursiveScheduler.schedule(new Action0() { | ||
|
||
@Override | ||
|
@@ -229,6 +230,7 @@ public boolean isUnsubscribed() { | |
@Override | ||
public void unsubscribe() { | ||
if (ONCE_UPDATER.getAndSet(this, 1) == 0) { | ||
// FIXME should subscription returned be added to something | ||
worker.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,7 @@ public void onError(Throwable e) { | |
|
||
@Override | ||
public void onNext(final Observable<T> o) { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See L36. |
||
inner.schedule(new Action0() { | ||
|
||
@Override | ||
|
@@ -94,6 +95,7 @@ public void onCompleted() { | |
public void onError(Throwable e) { | ||
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) { | ||
// retry again | ||
// FIXME should subscription returned be added to the child composite | ||
inner.schedule(_self); | ||
} else { | ||
// give up and pass the failure | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) { | |
|
||
SamplerSubscriber<T> sampler = new SamplerSubscriber<T>(s); | ||
child.add(sampler); | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See L50. |
||
worker.schedulePeriodically(sampler, time, time, unit); | ||
|
||
return sampler; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) { | |
final Worker worker = scheduler.createWorker(); | ||
child.add(worker); | ||
final AtomicBoolean gate = new AtomicBoolean(); | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See L44. |
||
worker.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,7 @@ public void request(final long n) { | |
// see unit test 'testSetProducerSynchronousRequest' for more context on this | ||
producer.request(n); | ||
} else { | ||
// FIXME should subscription returned be added to the subscriber composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that I think about it, L57 is not necessary because the entire worker is added to the child on L42. |
||
inner.schedule(new Action0() { | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) { | |
child.add(worker); | ||
|
||
TakeSubscriber<T> ts = new TakeSubscriber<T>(new SerializedSubscriber<T>(child)); | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See L46. |
||
worker.schedule(ts, time, unit); | ||
return ts; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ public void onNext(T t) { | |
@Override | ||
public void call() { | ||
final Scheduler.Worker inner = scheduler.createWorker(); | ||
// FIXME should subscription returned be added to the subscriber composite | ||
inner.schedule(new Action0() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This inner scheduler or the task can't be added because the unsubscription of child triggers the call and would prevent it from executing. |
||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,6 +275,7 @@ public void onCompleted() { | |
} | ||
|
||
void scheduleExact() { | ||
// FIXME should subscription returned be added to the child composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L65: the worker is added to child. |
||
worker.schedulePeriodically(new Action0() { | ||
|
||
@Override | ||
|
@@ -420,6 +421,7 @@ public void onCompleted() { | |
child.onCompleted(); | ||
} | ||
void scheduleChunk() { | ||
// FIXME should subscription returned be added to the child composite | ||
worker.schedulePeriodically(new Action0() { | ||
|
||
@Override | ||
|
@@ -444,6 +446,7 @@ void startNewChunk() { | |
return; | ||
} | ||
|
||
// FIXME should subscription returned be added to the child composite | ||
worker.schedule(new Action0() { | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ private ObjectPool(final int min, final int max, final long validationInterval) | |
initialize(min); | ||
|
||
schedulerWorker = Schedulers.computation().createWorker(); | ||
// FIXME should subscription returned be added to some composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no place to unsubscribe this worker: it will exist until the JVM quits. |
||
schedulerWorker.schedulePeriodically(new Action0() { | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,7 @@ private void _onCompleted() { | |
* the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted} | ||
*/ | ||
public void onCompleted(long timeInMilliseconds) { | ||
// FIXME should subscription returned be added to some composite | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wraps a TestScheduler which only lives during a test run so no loss here. |
||
innerScheduler.schedule(new Action0() { | ||
|
||
@Override | ||
|
@@ -125,6 +126,7 @@ private void _onError(final Throwable e) { | |
* the number of milliseconds in the future relative to "now()" at which to call {@code onError} | ||
*/ | ||
public void onError(final Throwable e, long timeInMilliseconds) { | ||
// FIXME should subscription returned be added to some composite | ||
innerScheduler.schedule(new Action0() { | ||
|
||
@Override | ||
|
@@ -158,6 +160,7 @@ private void _onNext(T v) { | |
* the number of milliseconds in the future relative to "now()" at which to call {@code onNext} | ||
*/ | ||
public void onNext(final T v, long timeInMilliseconds) { | ||
// FIXME should subscription returned be added to some composite | ||
innerScheduler.schedule(new Action0() { | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L47: the
s.add
ensures that pending tasks on the worker are cancelled.