Skip to content

Commit 8b7a8a6

Browse files
committed
Merge pull request #3332 from akarnokd/SubjectTests
2.x: subject tests and bugfixes
2 parents 3f973b0 + d6a683a commit 8b7a8a6

14 files changed

+3749
-17
lines changed

src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time
122122
Future<?> f;
123123
try {
124124
if (delayTime <= 0) {
125-
f = executor.submit(run);
125+
f = executor.submit(sr);
126126
} else {
127-
f = executor.schedule(run, delayTime, unit);
127+
f = executor.schedule(sr, delayTime, unit);
128128
}
129129
sr.setFuture(f);
130130
} catch (RejectedExecutionException ex) {

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public boolean hasValue() {
120120

121121
@Override
122122
public boolean hasComplete() {
123-
return state.subscribers() == State.TERMINATED;
123+
Object o = state.get();
124+
return state.subscribers() == State.TERMINATED && !NotificationLite.isError(o);
124125
}
125126

126127
@Override
@@ -132,7 +133,7 @@ public boolean hasThrowable() {
132133
public Throwable getThrowable() {
133134
Object o = state.get();
134135
if (NotificationLite.isError(o)) {
135-
return (Throwable)o;
136+
return NotificationLite.getError(o);
136137
}
137138
return null;
138139
}
@@ -151,7 +152,7 @@ public T getValue() {
151152
@SuppressWarnings("unchecked")
152153
public T[] getValues(T[] array) {
153154
Object o = state.get();
154-
if (o != null && !NotificationLite.isError(o)) {
155+
if (o != null && !NotificationLite.isError(o) && !NotificationLite.isComplete(o)) {
155156
int n = array.length;
156157
if (n == 0) {
157158
array = Arrays.copyOf(array, 1);
@@ -160,6 +161,10 @@ public T[] getValues(T[] array) {
160161
if (array.length > 1) {
161162
array[1] = null;
162163
}
164+
} else {
165+
if (array.length != 0) {
166+
array[0] = null;
167+
}
163168
}
164169
return array;
165170
}
@@ -292,7 +297,9 @@ public void setValue(T value) {
292297
} else
293298
if (s == HAS_REQUEST_NO_VALUE) {
294299
lazySet(HAS_REQUEST_HAS_VALUE); // setValue is called once, no need for CAS
295-
actual.onNext(value);
300+
if (value != null) {
301+
actual.onNext(value);
302+
}
296303
actual.onComplete();
297304
}
298305
}
@@ -332,7 +339,9 @@ public void request(long n) {
332339
if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
333340
@SuppressWarnings("unchecked")
334341
T v = (T)state.get();
335-
actual.onNext(v);
342+
if (v != null) {
343+
actual.onNext(v);
344+
}
336345
actual.onComplete();
337346
return;
338347
}

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.subjects;
1515

16+
import java.lang.reflect.Array;
1617
import java.util.Objects;
1718
import java.util.concurrent.atomic.*;
1819
import java.util.concurrent.locks.StampedLock;
@@ -75,11 +76,16 @@ public void onComplete() {
7576
public boolean hasSubscribers() {
7677
return state.subscribers.length != 0;
7778
}
79+
80+
81+
/* test support*/ int subscriberCount() {
82+
return state.subscribers.length;
83+
}
7884

7985
@Override
8086
public Throwable getThrowable() {
8187
Object o = state.get();
82-
if (NotificationLite.isComplete(o)) {
88+
if (NotificationLite.isError(o)) {
8389
return NotificationLite.getError(o);
8490
}
8591
return null;
@@ -95,9 +101,10 @@ public T getValue() {
95101
}
96102

97103
@Override
104+
@SuppressWarnings("unchecked")
98105
public T[] getValues(T[] array) {
99106
Object o = state.get();
100-
if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
107+
if (o == null || NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
101108
if (array.length != 0) {
102109
array[0] = null;
103110
}
@@ -109,6 +116,9 @@ public T[] getValues(T[] array) {
109116
if (array.length != 1) {
110117
array[1] = null;
111118
}
119+
} else {
120+
array = (T[])Array.newInstance(array.getClass().getComponentType(), 1);
121+
array[0] = v;
112122
}
113123
return array;
114124
}

src/main/java/io/reactivex/subjects/ReplaySubject.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public static <T> ReplaySubject<T> createWithSize(int size) {
5555
return createWithBuffer(buffer);
5656
}
5757

58+
/* test */ static <T> ReplaySubject<T> createUnbounded() {
59+
SizeBoundReplayBuffer<T> buffer = new SizeBoundReplayBuffer<>(Integer.MAX_VALUE);
60+
return createWithBuffer(buffer);
61+
}
62+
5863
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit) {
5964
return createWithTime(maxAge, unit, Schedulers.trampoline());
6065
}
@@ -118,6 +123,10 @@ public boolean hasSubscribers() {
118123
return state.subscribers.length != 0;
119124
}
120125

126+
/* test */ int subscriberCount() {
127+
return state.subscribers.length;
128+
}
129+
121130
@Override
122131
public Throwable getThrowable() {
123132
Object o = state.get();
@@ -154,6 +163,10 @@ public boolean hasValue() {
154163
return state.buffer.size() != 0;
155164
}
156165

166+
/* test*/ int size() {
167+
return state.buffer.size();
168+
}
169+
157170
static final class State<T> extends AtomicReference<Object> implements Publisher<T>, Subscriber<T> {
158171
/** */
159172
private static final long serialVersionUID = -4673197222000219014L;
@@ -187,8 +200,10 @@ public void subscribe(Subscriber<? super T> s) {
187200
if (add(rs)) {
188201
if (rs.cancelled) {
189202
remove(rs);
203+
return;
190204
}
191205
}
206+
buffer.replay(rs);
192207
}
193208
}
194209

@@ -585,6 +600,7 @@ public SizeBoundReplayBuffer(int maxSize) {
585600

586601
void trim() {
587602
if (size > maxSize) {
603+
size--;
588604
Node<Object> h = head;
589605
head = h.get();
590606
}
@@ -598,7 +614,12 @@ public void add(T value) {
598614

599615
tail = n;
600616
size++;
601-
t.lazySet(n); // releases both the tail and size
617+
/*
618+
* FIXME not sure why lazySet doesn't work here
619+
* (testReplaySubjectEmissionSubscriptionRace hangs)
620+
* must be the lack of StoreLoad barrier?
621+
*/
622+
t.set(n); // releases both the tail and size
602623

603624
trim();
604625
}
@@ -664,6 +685,9 @@ public T[] getValues(T[] array) {
664685
i++;
665686
h = next;
666687
}
688+
if (array.length > s) {
689+
array[s] = null;
690+
}
667691
}
668692

669693
return array;
@@ -737,9 +761,12 @@ public void replay(ReplaySubscription<T> rs) {
737761
index = n;
738762
}
739763

740-
if (!unbounded) {
741-
r = ReplaySubscription.REQUESTED.addAndGet(rs, e);
764+
if (e != 0L) {
765+
if (!unbounded) {
766+
r = ReplaySubscription.REQUESTED.addAndGet(rs, e);
767+
}
742768
}
769+
743770
if (index.get() != null && r != 0L) {
744771
continue;
745772
}
@@ -760,6 +787,10 @@ public int size() {
760787
while (s != Integer.MAX_VALUE) {
761788
Node<Object> next = h.get();
762789
if (next == null) {
790+
Object o = h.value;
791+
if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
792+
s--;
793+
}
763794
break;
764795
}
765796
s++;
@@ -796,6 +827,7 @@ public SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Sch
796827

797828
void trim() {
798829
if (size > maxSize) {
830+
size--;
799831
TimedNode<Object> h = head;
800832
head = h.get();
801833
}
@@ -849,7 +881,12 @@ public void add(T value) {
849881

850882
tail = n;
851883
size++;
852-
t.lazySet(n); // releases both the tail and size
884+
/*
885+
* FIXME not sure why lazySet doesn't work here
886+
* (testReplaySubjectEmissionSubscriptionRace hangs)
887+
* must be the lack of StoreLoad barrier?
888+
*/
889+
t.set(n); // releases both the tail and size
853890

854891
trim();
855892
}
@@ -916,6 +953,9 @@ public T[] getValues(T[] array) {
916953
i++;
917954
h = next;
918955
}
956+
if (array.length > s) {
957+
array[s] = null;
958+
}
919959
}
920960

921961
return array;
@@ -934,6 +974,19 @@ public void replay(ReplaySubscription<T> rs) {
934974
TimedNode<Object> index = (TimedNode<Object>)rs.index;
935975
if (index == null) {
936976
index = head;
977+
if (!done) {
978+
// skip old entries
979+
long limit = scheduler.now(unit) - maxAge;
980+
TimedNode<Object> next = index.get();
981+
while (next != null) {
982+
long ts = next.time;
983+
if (ts > limit) {
984+
break;
985+
}
986+
index = next;
987+
next = index.get();
988+
}
989+
}
937990
}
938991

939992
for (;;) {
@@ -989,9 +1042,12 @@ public void replay(ReplaySubscription<T> rs) {
9891042
index = n;
9901043
}
9911044

992-
if (!unbounded) {
993-
r = ReplaySubscription.REQUESTED.addAndGet(rs, e);
1045+
if (e != 0L) {
1046+
if (!unbounded) {
1047+
r = ReplaySubscription.REQUESTED.addAndGet(rs, e);
1048+
}
9941049
}
1050+
9951051
if (index.get() != null && r != 0L) {
9961052
continue;
9971053
}
@@ -1012,6 +1068,10 @@ public int size() {
10121068
while (s != Integer.MAX_VALUE) {
10131069
TimedNode<Object> next = h.get();
10141070
if (next == null) {
1071+
Object o = h.value;
1072+
if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) {
1073+
s--;
1074+
}
10151075
break;
10161076
}
10171077
s++;

src/test/java/io/reactivex/internal/operators/OperatorConcatTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,10 @@ public void testSimpleAsyncConcat() {
131131

132132
@Test
133133
public void testNestedAsyncConcatLoop() throws Throwable {
134-
for (int i = 0; i < 100; i++) {
135-
System.out.println("testNestedAsyncConcat >> " + i);
134+
for (int i = 0; i < 500; i++) {
135+
if (i % 10 == 0) {
136+
System.out.println("testNestedAsyncConcat >> " + i);
137+
}
136138
testNestedAsyncConcat();
137139
}
138140
}
@@ -151,6 +153,9 @@ public void testNestedAsyncConcat() throws Throwable {
151153

152154
final AtomicReference<Thread> parent = new AtomicReference<>();
153155
final CountDownLatch parentHasStarted = new CountDownLatch(1);
156+
final CountDownLatch parentHasFinished = new CountDownLatch(1);
157+
158+
154159
Observable<Observable<String>> observableOfObservables = Observable.create(new Publisher<Observable<String>>() {
155160

156161
@Override
@@ -198,6 +203,7 @@ public void run() {
198203
} finally {
199204
System.out.println("Done parent Observable");
200205
observer.onComplete();
206+
parentHasFinished.countDown();
201207
}
202208
}
203209
}));
@@ -246,6 +252,15 @@ public void run() {
246252
throw new RuntimeException("failed waiting on threads", e);
247253
}
248254

255+
try {
256+
// wait for the parent to complete
257+
if (!parentHasFinished.await(5, TimeUnit.SECONDS)) {
258+
fail("Parent didn't finish within the time limit");
259+
}
260+
} catch (Throwable e) {
261+
throw new RuntimeException("failed waiting on threads", e);
262+
}
263+
249264
inOrder.verify(observer, times(1)).onNext("seven");
250265
inOrder.verify(observer, times(1)).onNext("eight");
251266
inOrder.verify(observer, times(1)).onNext("nine");

src/test/java/io/reactivex/internal/operators/OperatorZipTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
114
package io.reactivex.internal.operators;
215

316
import static org.junit.Assert.*;

0 commit comments

Comments
 (0)