Skip to content

PublishSubject Add Before onSubscribe #3345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

benjchristensen
Copy link
Member

Proposed change to register with state before onSubscribe so that "doOnSubscribe" side-effects are safe.

If this is approved as a change, we'd want to do similar for the other Subjects as well I imagine.

I came across this after debugging race conditions in some code and tracking it down to publishSubject.subscribe not actually subscribing until AFTER onSubscribe is completed. This breaks things if doOnSubscribe side-effects cause data to flow through that subject.

Is there any reason not to make this change?

@benjchristensen
Copy link
Member Author

Another issue I found while testing this:

    @Test
    public void testDoOnSubscribe() throws InterruptedException {
        PublishSubject<Object> as = PublishSubject.create();
        TestSubscriber<Object> ts = new TestSubscriber<>();

        as.doOnSubscribe(s -> {
            // when subscribed, trigger side-effect that causes data to flow
            as.onNext(1);
            as.onComplete();
        }).subscribe(ts);

        if(!ts.await(500, TimeUnit.MILLISECONDS)) {
            fail("Did not receive events");
        } 

        ts.assertValue(1);
        ts.assertComplete();
    }

If fails with a MissingBackpressureException because when doOnSubscribe runs, apparently request(n) has not been invoked.

java.lang.AssertionError: Expected: 1 (class: Integer), Actual: []
    at io.reactivex.subscribers.TestSubscriber.fail(TestSubscriber.java:356)
    at io.reactivex.subscribers.TestSubscriber.assertValue(TestSubscriber.java:485)
    at io.reactivex.subjects.PublishSubjectTest.testDoOnSubscribe(PublishSubjectTest.java:493)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
    Suppressed: io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requests
        at io.reactivex.subjects.PublishSubject$PublishSubscriber.onNext(PublishSubject.java:308)
        at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:86)
        at io.reactivex.subjects.PublishSubjectTest.lambda$1(PublishSubjectTest.java:485)
        at io.reactivex.subjects.PublishSubjectTest$$Lambda$7/1471868639.accept(Unknown Source)
        at io.reactivex.internal.subscribers.SubscriptionLambdaSubscriber.onSubscribe(SubscriptionLambdaSubscriber.java:45)
        at io.reactivex.subjects.PublishSubject$State.subscribe(PublishSubject.java:205)
        at io.reactivex.internal.operators.PublisherLift.subscribe(PublisherLift.java:75)
        at io.reactivex.Observable.subscribeActual(Observable.java:2522)
        at io.reactivex.Observable.subscribe(Observable.java:2511)
        at io.reactivex.subjects.PublishSubjectTest.testDoOnSubscribe(PublishSubjectTest.java:487)
        ... 23 more

It's the same even if changed to doOnRequest:

    @Test
    public void testDoOnSubscribe() throws InterruptedException {
        PublishSubject<Object> as = PublishSubject.create();
        TestSubscriber<Object> ts = new TestSubscriber<>();

        as.doOnRequest(s -> {
            // when subscribed, trigger side-effect that causes data to flow
            as.onNext(1);
            as.onComplete();
        }).subscribe(ts);

        if(!ts.await(500, TimeUnit.MILLISECONDS)) {
            fail("Did not receive events");
        } 

        ts.assertValue(1);
        ts.assertComplete();
    }

So, do we need a doAfterSubscribe or doAfterRequest for this type of use case?

(And yes ... I know these unit tests represent very odd use cases ... bad, side-effecting, recursive, etc. But ... they happen. This is a very simplified example of issues I've run into while simulating IO with subjects).

@benjchristensen
Copy link
Member Author

cc @akarnokd for your review of the PR.

@benjchristensen
Copy link
Member Author

So this breaks BlockingOperatorNext for some reason that I haven't yet figured out.

The root of what I'm trying to solve is how to know when a Subscriber is actually subscribed to a Subject. Right now I can't find a reliable hook point to know once it is subscribed and will receive data, at which point I can trigger work that will emit data.

Proposed change to register with state before onSubscribe so that "doOnSubscribe" side-effects are safe.
@benjchristensen benjchristensen force-pushed the PublishSubject-onSubscribe branch from 08af295 to b04c893 Compare September 15, 2015 03:34
@benjchristensen
Copy link
Member Author

I just force pushed a revision that I think achieves the goal I have, while also working with the BlockingOperatorNext use case.

Here is a summary of what I need:

subject.subscribe(new Subscriber<T>() {

 public void onSubscribe(Subscription s) {
    s.request(someN);
    // now that we are subscribed ... trigger work that will cause data to flow to the subject
 }

 public void onNext(T t) {
    // expect to get data here after trigger from within 
 }
});

Right now there is not a deterministic time to start sending data, as onSubscribe must be completed, and then there is another method invocation to register the Subscriber with the Subject, which means there is no mechanism to know when it is done subscribing.

Thus, I suggest this change so that add happens BEFORE onSubscribe is invoked.

@benjchristensen
Copy link
Member Author

All unit tests are now passing. @akarnokd please let me know if there's a reason to not proceed with this, or if you have a better way of deterministically signaling when a Subscriber is done subscribing to a Subject and can receive data.

@akarnokd
Copy link
Member

Your proposed change violates rule §1.3. You make the Subscriber visible to the Publisher before this subscriber received its Subscription thus a concurrent onNext will be delivered to it and may cause NPEs downstream.

I suggest you find another way of doing what you tried, for example, with takeUntil, startWith or something like it.

If fails with a MissingBackpressureException because when doOnSubscribe runs, apparently request(n) has not been invoked.

Exactly, there is an order to the onXXX methods.

So, do we need a doAfterSubscribe or doAfterRequest for this type of use case?

This will happen after the child has handled the Subscription but still before the Subscriber becomes visible to the Subject.

(And yes ... I know these unit tests represent very odd use cases ... bad, side-effecting, recursive, etc. But ... they happen. This is a very simplified example of issues I've run into while simulating IO with subjects).

I think this example is simply the incorrect usage of the API and a programmer error. 1.x was very forgiving but if we want to be RS compatible and not surprise other RS implementations, such usage patterns need to be reconsidered.

Right now I can't find a reliable hook point to know once it is subscribed and will receive data, at which point I can trigger work that will emit data.

If you explained the intended use case (not how you implemented it), maybe we can find another dataflow pattern that gets you what you want.

}
}
}
public void subscribe(Subscriber<? super T> t) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want to avoid tabs in the source, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? We care about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ReactiveX/RxJava/blob/1.x/CONTRIBUTING.md

When submitting code, please make every effort to follow existing conventions and style in order to keep the code as readable as possible.

@benjchristensen
Copy link
Member Author

Your proposed change violates rule §1.3. You make the Subscriber visible to the Publisher before this subscriber received its Subscription thus a concurrent onNext will be delivered to it and may cause NPEs downstream.

Here is rule §1.3:

onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications).

How would my proposed change make concurrent notifications occur? That was also illegal in RxJava 1.x so I'm not proposing anything non-contractual.

It is still up to the source emitting to the Subject to ensure it is sequential. Having a hook to define when the "add" is done does not mean that the source will somehow start emitting to the Subject concurrently. Additionally, being registered with the Subject does not mean it will receive any emissions since nothing has been requested yet. Once the request(n) is triggered from inside the onSubscribe, then data can flow. For example, in my use case, I am using a SerializedSubject, to represent an IO event-loop that can receive writes from anywhere.

So, what is the unit test that shows my changes break the contract?

the intended use case

To know when the subscription to the Subject is actually connected. Right now I effectively have to put a non-deterministic sleep as I have no way from the outside of knowing when the subscribe/onSubscribe/add dance is done.

1.x was very forgiving but if we want to be RS compatible and not surprise other RS implementations, such usage patterns need to be reconsidered.

I question whether Subject really suits the Reactive Streams contract at all. A Subject is hot, and doesn't compose backpressure through. In this implementation it just means it blows up if a subscriber doesn't have a requested count > 0. That means this isn't Reactive Streams compliant because it means the source (the Subject) didn't respect the request(n). In fact, I have to believe that once we connect to the TCK, these Subject implementations can not pass the tests of representing themselves as Publishers. Thus, I don't think your statement of it being "programmer error" is valid since a Subject is intended to be used for "hot" and "push".

@akarnokd
Copy link
Member

How would my proposed change make concurrent notifications occur? That was also illegal in RxJava 1.x so I'm not proposing anything non-contractual.

  • Thread 1: is about to emit an onNext through the PublishSubject
  • Thread 2: finishes adding the new subscriber, without the Subscription, to the array of subscriptions
  • Thread 1: emits the onNext to the list of subscriptions which now contains the Subscriber of thread 2 and violates §1.3 because now Thread 1 is in onSubscribe and Thread 2 is in onNext of the same Subscriber.

Additionally, being registered with the Subject does not mean it will receive any emissions since nothing has been requested yet.

So you say PublishSubject should drop values until the child subscriber requests anything? If the request runs out, should PublishSubject drop values again?

So, what is the unit test that shows my changes break the contract?

gist

To know when the subscription to the Subject is actually connected.

It knows when the first onNext arrives, otherwise I don't think what you want can be achieved through RS. You need some kind of two-phase subscription: one phase to get the request from the child and a second phase to notify the child it is "online". But then this "online" notification may race with a regular onNext. You may need something like a BehaviorSubject - PublishSubject hybrid which emits an initial "online" value and only then allows any regular onNext values to happen.

That means this isn't Reactive Streams compliant because it means the source (the Subject) didn't respect the request(n).

By extension, this would mean Observable.error() doesn't respect that either. Sources can call onError instead of an onNext if they chose to, a request(n) doesn't mean a source has to produce N values no matter what.

In fact, I have to believe that once we connect to the TCK, these Subject implementations can not pass the tests of representing themselves as Publishers.

As far as I know, not all tests are applicable to all operators, let's say, to empty(). The current 2.x Subject implementations conform the spec as they call onSubscribe first and then the rest of the on methods, and keep track of the client's request amount. In case the client can't keep up, they error out said client which should trigger the consideration of using onBackpressureXXX methods. So by the book, they don't overflow the client and bail out cleanly.

Thus, I don't think your statement of it being "programmer error" is valid since a Subject is intended to be used for "hot" and "push".

When I ported the tests over to 2.x I've noticed that few of the 1.x tests failed even though the operators they were testing were clearly okay. It turned out the tests had hidden races in them which were exposed by the smaller overhead of the new operators. The RS API has more potential for data-races which is mostly captured by the RS spec itself as must and mustn't clauses.

@benjchristensen
Copy link
Member Author

I don't have time to continue this discussion right now ... I'll come back to it after creating a custom Subject for my use case that solves my immediate need.

@benjchristensen
Copy link
Member Author

Instead of continuing this discussion here, I'm closing this and will instead dig into the more important design decisions of v2 that impact Observable, Flowable, Subject, Publisher, Processor, etc.

Since the PublishSubject of v2 is currently in the state it is, I ended up solving my use case mostly without Rx.

benjchristensen added a commit that referenced this pull request Sep 16, 2015
Clarification of `Subject` that affects implementation. 

Related to discussion in #3345.
@benjchristensen benjchristensen deleted the PublishSubject-onSubscribe branch January 9, 2017 20:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants