-
Notifications
You must be signed in to change notification settings - Fork 7.6k
PublishSubject Add Before onSubscribe #3345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PublishSubject Add Before onSubscribe #3345
Conversation
Another issue I found while testing this: @Test
public void testDoOnSubscribe() throws InterruptedException {
PublishSubject<Object> as = PublishSubject.create();
TestSubscriber<Object> ts = new TestSubscriber<>();
as.doOnSubscribe(s -> {
// when subscribed, trigger side-effect that causes data to flow
as.onNext(1);
as.onComplete();
}).subscribe(ts);
if(!ts.await(500, TimeUnit.MILLISECONDS)) {
fail("Did not receive events");
}
ts.assertValue(1);
ts.assertComplete();
} If fails with a
It's the same even if changed to @Test
public void testDoOnSubscribe() throws InterruptedException {
PublishSubject<Object> as = PublishSubject.create();
TestSubscriber<Object> ts = new TestSubscriber<>();
as.doOnRequest(s -> {
// when subscribed, trigger side-effect that causes data to flow
as.onNext(1);
as.onComplete();
}).subscribe(ts);
if(!ts.await(500, TimeUnit.MILLISECONDS)) {
fail("Did not receive events");
}
ts.assertValue(1);
ts.assertComplete();
} So, do we need a (And yes ... I know these unit tests represent very odd use cases ... bad, side-effecting, recursive, etc. But ... they happen. This is a very simplified example of issues I've run into while simulating IO with subjects). |
cc @akarnokd for your review of the PR. |
So this breaks The root of what I'm trying to solve is how to know when a |
Proposed change to register with state before onSubscribe so that "doOnSubscribe" side-effects are safe.
08af295
to
b04c893
Compare
I just force pushed a revision that I think achieves the goal I have, while also working with the Here is a summary of what I need: subject.subscribe(new Subscriber<T>() {
public void onSubscribe(Subscription s) {
s.request(someN);
// now that we are subscribed ... trigger work that will cause data to flow to the subject
}
public void onNext(T t) {
// expect to get data here after trigger from within
}
}); Right now there is not a deterministic time to start sending data, as Thus, I suggest this change so that |
All unit tests are now passing. @akarnokd please let me know if there's a reason to not proceed with this, or if you have a better way of deterministically signaling when a |
Your proposed change violates rule §1.3. You make the I suggest you find another way of doing what you tried, for example, with
Exactly, there is an order to the
This will happen after the child has handled the
I think this example is simply the incorrect usage of the API and a programmer error. 1.x was very forgiving but if we want to be RS compatible and not surprise other RS implementations, such usage patterns need to be reconsidered.
If you explained the intended use case (not how you implemented it), maybe we can find another dataflow pattern that gets you what you want. |
} | ||
} | ||
} | ||
public void subscribe(Subscriber<? super T> t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still want to avoid tabs in the source, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? We care about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/ReactiveX/RxJava/blob/1.x/CONTRIBUTING.md
When submitting code, please make every effort to follow existing conventions and style in order to keep the code as readable as possible.
Here is rule §1.3:
How would my proposed change make concurrent notifications occur? That was also illegal in RxJava 1.x so I'm not proposing anything non-contractual. It is still up to the source emitting to the So, what is the unit test that shows my changes break the contract?
To know when the subscription to the Subject is actually connected. Right now I effectively have to put a non-deterministic sleep as I have no way from the outside of knowing when the subscribe/onSubscribe/add dance is done.
I question whether |
So you say
It knows when the first
By extension, this would mean
As far as I know, not all tests are applicable to all operators, let's say, to
When I ported the tests over to 2.x I've noticed that few of the 1.x tests failed even though the operators they were testing were clearly okay. It turned out the tests had hidden races in them which were exposed by the smaller overhead of the new operators. The RS API has more potential for data-races which is mostly captured by the RS spec itself as must and mustn't clauses. |
I don't have time to continue this discussion right now ... I'll come back to it after creating a custom Subject for my use case that solves my immediate need. |
Instead of continuing this discussion here, I'm closing this and will instead dig into the more important design decisions of v2 that impact Since the |
Clarification of `Subject` that affects implementation. Related to discussion in #3345.
Proposed change to register with state before onSubscribe so that "doOnSubscribe" side-effects are safe.
If this is approved as a change, we'd want to do similar for the other Subjects as well I imagine.
I came across this after debugging race conditions in some code and tracking it down to publishSubject.subscribe not actually subscribing until AFTER onSubscribe is completed. This breaks things if doOnSubscribe side-effects cause data to flow through that subject.
Is there any reason not to make this change?