Skip to content

Commit 519c38b

Browse files
authored
1.x: fix ReplaySubject anomaly around caughtUp by disabling that opt. (#4051)
1 parent 03bc7a2 commit 519c38b

File tree

3 files changed

+99
-144
lines changed

3 files changed

+99
-144
lines changed

src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 44 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.exceptions.Exceptions;
2727
import rx.internal.operators.BackpressureUtils;
2828
import rx.internal.util.RxJavaPluginUtils;
29+
import rx.schedulers.Schedulers;
2930

3031
/**
3132
* Subject that buffers all items it observes and replays them to any {@link Observer} that subscribes.
@@ -110,6 +111,24 @@ public static <T> ReplaySubject<T> create(int capacity) {
110111
ReplayState<T> state = new ReplayState<T>(buffer);
111112
return new ReplaySubject<T>(state);
112113
}
114+
/**
115+
* Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.
116+
* <p>
117+
* This variant behaves like the regular unbounded {@code ReplaySubject} created via {@link #create()} but
118+
* uses the structures of the bounded-implementation. This is by no means intended for the replacement of
119+
* the original, array-backed and unbounded {@code ReplaySubject} due to the additional overhead of the
120+
* linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior
121+
* of the bounded implementations without the interference of the eviction policies.
122+
*
123+
* @param <T>
124+
* the type of items observed and emitted by the Subject
125+
* @return the created subject
126+
*/
127+
/* public */ static <T> ReplaySubject<T> createUnboundedTime() {
128+
ReplayBuffer<T> buffer = new ReplaySizeAndTimeBoundBuffer<T>(Integer.MAX_VALUE, Long.MAX_VALUE, Schedulers.immediate());
129+
ReplayState<T> state = new ReplayState<T>(buffer);
130+
return new ReplaySubject<T>(state);
131+
}
113132
/**
114133
* Creates a size-bounded replay subject.
115134
* <p>
@@ -431,14 +450,7 @@ public void onNext(T t) {
431450

432451
b.next(t);
433452
for (ReplayProducer<T> rp : get()) {
434-
if (rp.caughtUp) {
435-
rp.actual.onNext(t);
436-
} else {
437-
if (b.drain(rp)) {
438-
rp.caughtUp = true;
439-
rp.node = null;
440-
}
441-
}
453+
b.drain(rp);
442454
}
443455
}
444456

@@ -451,14 +463,7 @@ public void onError(Throwable e) {
451463
List<Throwable> errors = null;
452464
for (ReplayProducer<T> rp : getAndSet(TERMINATED)) {
453465
try {
454-
if (rp.caughtUp) {
455-
rp.actual.onError(e);
456-
} else {
457-
if (b.drain(rp)) {
458-
rp.caughtUp = true;
459-
rp.node = null;
460-
}
461-
}
466+
b.drain(rp);
462467
} catch (Throwable ex) {
463468
if (errors == null) {
464469
errors = new ArrayList<Throwable>();
@@ -477,14 +482,7 @@ public void onCompleted() {
477482

478483
b.complete();
479484
for (ReplayProducer<T> rp : getAndSet(TERMINATED)) {
480-
if (rp.caughtUp) {
481-
rp.actual.onCompleted();
482-
} else {
483-
if (b.drain(rp)) {
484-
rp.caughtUp = true;
485-
rp.node = null;
486-
}
487-
}
485+
b.drain(rp);
488486
}
489487
}
490488

@@ -508,7 +506,7 @@ interface ReplayBuffer<T> {
508506

509507
void complete();
510508

511-
boolean drain(ReplayProducer<T> rp);
509+
void drain(ReplayProducer<T> rp);
512510

513511
boolean isComplete();
514512

@@ -585,9 +583,9 @@ public void complete() {
585583
}
586584

587585
@Override
588-
public boolean drain(ReplayProducer<T> rp) {
586+
public void drain(ReplayProducer<T> rp) {
589587
if (rp.getAndIncrement() != 0) {
590-
return false;
588+
return;
591589
}
592590

593591
int missed = 1;
@@ -610,7 +608,7 @@ public boolean drain(ReplayProducer<T> rp) {
610608
while (e != r) {
611609
if (a.isUnsubscribed()) {
612610
rp.node = null;
613-
return false;
611+
return;
614612
}
615613

616614
boolean d = done;
@@ -624,7 +622,7 @@ public boolean drain(ReplayProducer<T> rp) {
624622
} else {
625623
a.onCompleted();
626624
}
627-
return false;
625+
return;
628626
}
629627

630628
if (empty) {
@@ -649,7 +647,7 @@ public boolean drain(ReplayProducer<T> rp) {
649647
if (e == r) {
650648
if (a.isUnsubscribed()) {
651649
rp.node = null;
652-
return false;
650+
return;
653651
}
654652

655653
boolean d = done;
@@ -663,7 +661,7 @@ public boolean drain(ReplayProducer<T> rp) {
663661
} else {
664662
a.onCompleted();
665663
}
666-
return false;
664+
return;
667665
}
668666
}
669667

@@ -679,7 +677,7 @@ public boolean drain(ReplayProducer<T> rp) {
679677

680678
missed = rp.addAndGet(-missed);
681679
if (missed == 0) {
682-
return r == Long.MAX_VALUE;
680+
return;
683681
}
684682
}
685683
}
@@ -799,9 +797,9 @@ public void complete() {
799797
}
800798

801799
@Override
802-
public boolean drain(ReplayProducer<T> rp) {
800+
public void drain(ReplayProducer<T> rp) {
803801
if (rp.getAndIncrement() != 0) {
804-
return false;
802+
return;
805803
}
806804

807805
final Subscriber<? super T> a = rp.actual;
@@ -822,7 +820,7 @@ public boolean drain(ReplayProducer<T> rp) {
822820
while (e != r) {
823821
if (a.isUnsubscribed()) {
824822
rp.node = null;
825-
return false;
823+
return;
826824
}
827825

828826
boolean d = done;
@@ -837,7 +835,7 @@ public boolean drain(ReplayProducer<T> rp) {
837835
} else {
838836
a.onCompleted();
839837
}
840-
return false;
838+
return;
841839
}
842840

843841
if (empty) {
@@ -853,7 +851,7 @@ public boolean drain(ReplayProducer<T> rp) {
853851
if (e == r) {
854852
if (a.isUnsubscribed()) {
855853
rp.node = null;
856-
return false;
854+
return;
857855
}
858856

859857
boolean d = done;
@@ -867,7 +865,7 @@ public boolean drain(ReplayProducer<T> rp) {
867865
} else {
868866
a.onCompleted();
869867
}
870-
return false;
868+
return;
871869
}
872870
}
873871

@@ -881,7 +879,7 @@ public boolean drain(ReplayProducer<T> rp) {
881879

882880
missed = rp.addAndGet(-missed);
883881
if (missed == 0) {
884-
return r == Long.MAX_VALUE;
882+
return;
885883
}
886884
}
887885
}
@@ -1051,9 +1049,9 @@ TimedNode<T> latestHead() {
10511049
}
10521050

10531051
@Override
1054-
public boolean drain(ReplayProducer<T> rp) {
1052+
public void drain(ReplayProducer<T> rp) {
10551053
if (rp.getAndIncrement() != 0) {
1056-
return false;
1054+
return;
10571055
}
10581056

10591057
final Subscriber<? super T> a = rp.actual;
@@ -1074,7 +1072,7 @@ public boolean drain(ReplayProducer<T> rp) {
10741072
while (e != r) {
10751073
if (a.isUnsubscribed()) {
10761074
rp.node = null;
1077-
return false;
1075+
return;
10781076
}
10791077

10801078
boolean d = done;
@@ -1089,7 +1087,7 @@ public boolean drain(ReplayProducer<T> rp) {
10891087
} else {
10901088
a.onCompleted();
10911089
}
1092-
return false;
1090+
return;
10931091
}
10941092

10951093
if (empty) {
@@ -1105,7 +1103,7 @@ public boolean drain(ReplayProducer<T> rp) {
11051103
if (e == r) {
11061104
if (a.isUnsubscribed()) {
11071105
rp.node = null;
1108-
return false;
1106+
return;
11091107
}
11101108

11111109
boolean d = done;
@@ -1119,7 +1117,7 @@ public boolean drain(ReplayProducer<T> rp) {
11191117
} else {
11201118
a.onCompleted();
11211119
}
1122-
return false;
1120+
return;
11231121
}
11241122
}
11251123

@@ -1133,7 +1131,7 @@ public boolean drain(ReplayProducer<T> rp) {
11331131

11341132
missed = rp.addAndGet(-missed);
11351133
if (missed == 0) {
1136-
return r == Long.MAX_VALUE;
1134+
return;
11371135
}
11381136
}
11391137
}
@@ -1226,14 +1224,6 @@ static final class ReplayProducer<T>
12261224
/** Holds the back-reference to the replay state object. */
12271225
final ReplayState<T> state;
12281226

1229-
/**
1230-
* Indicates this Subscriber runs unbounded and the <b>source</b>-triggered
1231-
* buffer.drain() has emitted all available values.
1232-
* <p>
1233-
* This field has to be read and written from the source emitter's thread only.
1234-
*/
1235-
boolean caughtUp;
1236-
12371227
/**
12381228
* Unbounded buffer.drain() uses this field to remember the absolute index of
12391229
* values replayed to this Subscriber.
@@ -1276,6 +1266,5 @@ public void request(long n) {
12761266
throw new IllegalArgumentException("n >= required but it was " + n);
12771267
}
12781268
}
1279-
12801269
}
12811270
}

0 commit comments

Comments
 (0)