Skip to content

Commit b04c893

Browse files
PublishSubject Add Before onSubscribe
Proposed change to register with state before onSubscribe so that "doOnSubscribe" side-effects are safe.
1 parent eecb41d commit b04c893

File tree

2 files changed

+71
-20
lines changed

2 files changed

+71
-20
lines changed

src/main/java/io/reactivex/subjects/PublishSubject.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -183,26 +183,29 @@ static final class State<T> extends AtomicReference<Object> implements Publisher
183183
AtomicReferenceFieldUpdater.newUpdater(State.class, PublishSubscriber[].class, "subscribers");
184184

185185
@Override
186-
public void subscribe(Subscriber<? super T> t) {
187-
PublishSubscriber<T> ps = new PublishSubscriber<>(t, this);
188-
t.onSubscribe(ps);
189-
if (ps.cancelled == 0) {
190-
if (add(ps)) {
191-
// if cancellation happened while a successful add, the remove() didn't work
192-
// so we need to do it again
193-
if (ps.cancelled != 0) {
194-
remove(ps);
195-
}
196-
} else {
197-
Object o = get();
198-
if (o == COMPLETE) {
199-
ps.onComplete();
200-
} else {
201-
ps.onError((Throwable)o);
202-
}
203-
}
204-
}
205-
}
186+
public void subscribe(Subscriber<? super T> t) {
187+
PublishSubscriber<T> ps = new PublishSubscriber<>(t, this);
188+
if (ps.cancelled == 0) {
189+
boolean added = add(ps);
190+
if (added) {
191+
// if cancellation happened while a successful add, the remove() didn't work
192+
// so we need to do it again
193+
if (ps.cancelled != 0) {
194+
remove(ps);
195+
}
196+
}
197+
t.onSubscribe(ps);
198+
if (!added) {
199+
Object o = get();
200+
if (o == COMPLETE) {
201+
ps.onComplete();
202+
} else {
203+
ps.onError((Throwable) o);
204+
}
205+
return;
206+
}
207+
}
208+
}
206209

207210
/**
208211
* @return the array of currently subscribed subscribers

src/test/java/io/reactivex/subjects/PublishSubjectTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import static org.mockito.Mockito.*;
1919

2020
import java.util.ArrayList;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
2123
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicReference;
2225
import java.util.function.*;
2326

2427
import org.junit.Test;
@@ -426,4 +429,49 @@ public void testCurrentStateMethodsError() {
426429
assertFalse(as.hasComplete());
427430
assertTrue(as.getThrowable() instanceof TestException);
428431
}
432+
433+
@Test
434+
public void testRegisteredBeforeOnSubscribe() throws InterruptedException {
435+
PublishSubject<Object> as = PublishSubject.create();
436+
437+
Runnable sideEffect = () -> {
438+
as.onNext(1);
439+
as.onComplete();
440+
};
441+
442+
final AtomicReference<Object> next = new AtomicReference<Object>();
443+
final CountDownLatch latch = new CountDownLatch(1);
444+
445+
as.subscribe(new Subscriber<Object>() {
446+
447+
@Override
448+
public void onSubscribe(Subscription s) {
449+
s.request(Long.MAX_VALUE);
450+
// we are subscribed, so expect that we can now trigger side-effects that cause data to flow
451+
sideEffect.run();
452+
}
453+
454+
@Override
455+
public void onNext(Object t) {
456+
next.set(t);
457+
}
458+
459+
@Override
460+
public void onError(Throwable t) {
461+
latch.countDown();
462+
}
463+
464+
@Override
465+
public void onComplete() {
466+
latch.countDown();
467+
}
468+
469+
});
470+
471+
if(!latch.await(500, TimeUnit.MILLISECONDS)) {
472+
fail("Did not receive events");
473+
}
474+
475+
assertEquals(1, next.get());
476+
}
429477
}

0 commit comments

Comments
 (0)