Skip to content

Lost Subscriptions #2576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void call(final Subscriber<? super T> s) {
final Worker worker = scheduler.createWorker();
s.add(worker);

// FIXME should subscription returned be added to the s composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L47: the s.add ensures that pending tasks on the worker are cancelled.

worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void setProducer(Producer producer) {
}));

// subscribe to the restarts observable to know when to schedule the next redo.
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L199: the worker is added to the child.

worker.schedule(new Action0() {
@Override
public void call() {
Expand All @@ -304,6 +305,7 @@ public void onError(Throwable e) {
public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
if (consumerCapacity.get() > 0) {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L199: the worker is added to the child.

worker.schedule(subscribeToSource);
} else {
resumeBoundary.compareAndSet(false, true);
Expand All @@ -329,6 +331,7 @@ public void request(final long n) {
producer.request(n);
} else
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L199: the worker is added to the child.

worker.schedule(subscribeToSource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public OnSubscribeTimerOnce(long time, TimeUnit unit, Scheduler scheduler) {
public void call(final Subscriber<? super Long> child) {
Worker worker = scheduler.createWorker();
child.add(worker);
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L43: the worker is added to the child.

worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit uni
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L45: the worker is added to the child.

worker.schedulePeriodically(new Action0() {
long counter;
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public void onCompleted() {
unsubscribe();
}
void scheduleChunk() {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L79: worker is added to subscriber.

inner.schedulePeriodically(new Action0() {
@Override
public void call() {
Expand All @@ -181,6 +182,7 @@ void startNewChunk() {
}
chunks.add(chunk);
}
// FIXME should subscription returned be added to the child composite
inner.schedule(new Action0() {
@Override
public void call() {
Expand Down Expand Up @@ -280,6 +282,7 @@ public void onCompleted() {
unsubscribe();
}
void scheduleExact() {
// FIXME should subscription returned be added to the child composite
inner.schedulePeriodically(new Action0() {
@Override
public void call() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/rx/internal/operators/OperatorDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

@Override
public void onCompleted() {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L50: worker is added to the child.

worker.schedule(new Action0() {

@Override
Expand All @@ -69,6 +70,7 @@ public void onError(Throwable e) {

@Override
public void onNext(final T t) {
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void onError(final Throwable e) {

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L101: recursiveScheduler is added to the child

recursiveScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -229,6 +230,7 @@ public boolean isUnsubscribed() {
@Override
public void unsubscribe() {
if (ONCE_UPDATER.getAndSet(this, 1) == 0) {
// FIXME should subscription returned be added to something
worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void onError(Throwable e) {

@Override
public void onNext(final Observable<T> o) {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L36.

inner.schedule(new Action0() {

@Override
Expand All @@ -94,6 +95,7 @@ public void onCompleted() {
public void onError(Throwable e) {
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
// FIXME should subscription returned be added to the child composite
inner.schedule(_self);
} else {
// give up and pass the failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {

SamplerSubscriber<T> sampler = new SamplerSubscriber<T>(s);
child.add(sampler);
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L50.

worker.schedulePeriodically(sampler, time, time, unit);

return sampler;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/operators/OperatorSkipTimed.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
final AtomicBoolean gate = new AtomicBoolean();
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L44.

worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void request(final long n) {
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
// FIXME should subscription returned be added to the subscriber composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think about it, L57 is not necessary because the entire worker is added to the child on L42.

inner.schedule(new Action0() {

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/operators/OperatorTakeTimed.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
child.add(worker);

TakeSubscriber<T> ts = new TakeSubscriber<T>(new SerializedSubscriber<T>(child));
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L46.

worker.schedule(ts, time, unit);
return ts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void onNext(T t) {
@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
// FIXME should subscription returned be added to the subscriber composite
inner.schedule(new Action0() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This inner scheduler or the task can't be added because the unsubscription of child triggers the call and would prevent it from executing.


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public void onCompleted() {
}

void scheduleExact() {
// FIXME should subscription returned be added to the child composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L65: the worker is added to child.

worker.schedulePeriodically(new Action0() {

@Override
Expand Down Expand Up @@ -420,6 +421,7 @@ public void onCompleted() {
child.onCompleted();
}
void scheduleChunk() {
// FIXME should subscription returned be added to the child composite
worker.schedulePeriodically(new Action0() {

@Override
Expand All @@ -444,6 +446,7 @@ void startNewChunk() {
return;
}

// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/util/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private ObjectPool(final int min, final int max, final long validationInterval)
initialize(min);

schedulerWorker = Schedulers.computation().createWorker();
// FIXME should subscription returned be added to some composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no place to unsubscribe this worker: it will exist until the JVM quits.

schedulerWorker.schedulePeriodically(new Action0() {

@Override
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/rx/subjects/TestSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private void _onCompleted() {
* the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted}
*/
public void onCompleted(long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wraps a TestScheduler which only lives during a test run so no loss here.

innerScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -125,6 +126,7 @@ private void _onError(final Throwable e) {
* the number of milliseconds in the future relative to "now()" at which to call {@code onError}
*/
public void onError(final Throwable e, long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
innerScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -158,6 +160,7 @@ private void _onNext(T v) {
* the number of milliseconds in the future relative to "now()" at which to call {@code onNext}
*/
public void onNext(final T v, long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
innerScheduler.schedule(new Action0() {

@Override
Expand Down