Skip to content

Commit 5e47f78

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Proposal: standardized Subject state-peeking methods.
1 parent cbf8edf commit 5e47f78

File tree

7 files changed

+645
-9
lines changed

7 files changed

+645
-9
lines changed

src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.lang.reflect.Array;
1819
import java.util.*;
1920

2021
import rx.Observer;
@@ -141,6 +142,7 @@ public boolean hasObservers() {
141142
* @return true if and only if the subject has some value but not an error
142143
*/
143144
@Experimental
145+
@Override
144146
public boolean hasValue() {
145147
Object v = lastValue;
146148
Object o = state.get();
@@ -151,6 +153,7 @@ public boolean hasValue() {
151153
* @return true if the subject has received a throwable through {@code onError}.
152154
*/
153155
@Experimental
156+
@Override
154157
public boolean hasThrowable() {
155158
Object o = state.get();
156159
return nl.isError(o);
@@ -160,6 +163,7 @@ public boolean hasThrowable() {
160163
* @return true if the subject completed normally via {@code onCompleted()}
161164
*/
162165
@Experimental
166+
@Override
163167
public boolean hasCompleted() {
164168
Object o = state.get();
165169
return o != null && !nl.isError(o);
@@ -174,6 +178,7 @@ public boolean hasCompleted() {
174178
* has terminated with an exception or has an actual {@code null} as a value.
175179
*/
176180
@Experimental
181+
@Override
177182
public T getValue() {
178183
Object v = lastValue;
179184
Object o = state.get();
@@ -188,11 +193,33 @@ public T getValue() {
188193
* subject hasn't terminated yet or it terminated normally.
189194
*/
190195
@Experimental
196+
@Override
191197
public Throwable getThrowable() {
192198
Object o = state.get();
193199
if (nl.isError(o)) {
194200
return nl.getError(o);
195201
}
196202
return null;
197203
}
204+
@Override
205+
@Experimental
206+
@SuppressWarnings("unchecked")
207+
public T[] getValues(T[] a) {
208+
Object v = lastValue;
209+
Object o = state.get();
210+
if (!nl.isError(o) && nl.isNext(v)) {
211+
T val = nl.getValue(v);
212+
if (a.length == 0) {
213+
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
214+
}
215+
a[0] = val;
216+
if (a.length > 1) {
217+
a[1] = null;
218+
}
219+
} else
220+
if (a.length > 0) {
221+
a[0] = null;
222+
}
223+
return a;
224+
}
198225
}

src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818

19+
import java.lang.reflect.Array;
1920
import java.util.*;
2021

2122
import rx.Observer;
@@ -177,6 +178,7 @@ public boolean hasObservers() {
177178
* @return true if and only if the subject has some value and hasn't terminated yet.
178179
*/
179180
@Experimental
181+
@Override
180182
public boolean hasValue() {
181183
Object o = state.get();
182184
return nl.isNext(o);
@@ -186,6 +188,7 @@ public boolean hasValue() {
186188
* @return true if the subject has received a throwable through {@code onError}.
187189
*/
188190
@Experimental
191+
@Override
189192
public boolean hasThrowable() {
190193
Object o = state.get();
191194
return nl.isError(o);
@@ -195,6 +198,7 @@ public boolean hasThrowable() {
195198
* @return true if the subject completed normally via {@code onCompleted()}
196199
*/
197200
@Experimental
201+
@Override
198202
public boolean hasCompleted() {
199203
Object o = state.get();
200204
return nl.isCompleted(o);
@@ -209,6 +213,7 @@ public boolean hasCompleted() {
209213
* has terminated or has an actual {@code null} as a valid value.
210214
*/
211215
@Experimental
216+
@Override
212217
public T getValue() {
213218
Object o = state.get();
214219
if (nl.isNext(o)) {
@@ -222,11 +227,31 @@ public T getValue() {
222227
* subject hasn't terminated yet or it terminated normally.
223228
*/
224229
@Experimental
230+
@Override
225231
public Throwable getThrowable() {
226232
Object o = state.get();
227233
if (nl.isError(o)) {
228234
return nl.getError(o);
229235
}
230236
return null;
231237
}
238+
@Override
239+
@Experimental
240+
@SuppressWarnings("unchecked")
241+
public T[] getValues(T[] a) {
242+
Object o = state.get();
243+
if (nl.isNext(o)) {
244+
if (a.length == 0) {
245+
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
246+
}
247+
a[0] = nl.getValue(o);
248+
if (a.length > 1) {
249+
a[1] = null;
250+
}
251+
} else
252+
if (a.length > 0) {
253+
a[0] = null;
254+
}
255+
return a;
256+
}
232257
}

src/main/java/rx/subjects/PublishSubject.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public boolean hasObservers() {
125125
* @return true if the subject has received a throwable through {@code onError}.
126126
*/
127127
@Experimental
128+
@Override
128129
public boolean hasThrowable() {
129130
Object o = state.get();
130131
return nl.isError(o);
@@ -134,6 +135,7 @@ public boolean hasThrowable() {
134135
* @return true if the subject completed normally via {@code onCompleted}
135136
*/
136137
@Experimental
138+
@Override
137139
public boolean hasCompleted() {
138140
Object o = state.get();
139141
return o != null && !nl.isError(o);
@@ -144,11 +146,36 @@ public boolean hasCompleted() {
144146
* subject hasn't terminated yet or it terminated normally.
145147
*/
146148
@Experimental
149+
@Override
147150
public Throwable getThrowable() {
148151
Object o = state.get();
149152
if (nl.isError(o)) {
150153
return nl.getError(o);
151154
}
152155
return null;
153156
}
157+
158+
@Override
159+
@Experimental
160+
public boolean hasValue() {
161+
return false;
162+
}
163+
@Override
164+
@Experimental
165+
public T getValue() {
166+
return null;
167+
}
168+
@Override
169+
@Experimental
170+
public Object[] getValues() {
171+
return new Object[0];
172+
}
173+
@Override
174+
@Experimental
175+
public T[] getValues(T[] a) {
176+
if (a.length > 0) {
177+
a[0] = null;
178+
}
179+
return a;
180+
}
154181
}

src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -550,12 +550,30 @@ public T[] toArray(T[] a) {
550550
for (int i = 0; i < s; i++) {
551551
a[i] = (T)list.get(i);
552552
}
553-
if (s < a.length - 1) {
553+
if (a.length > s) {
554554
a[s] = null;
555555
}
556+
} else
557+
if (a.length > 0) {
558+
a[0] = null;
556559
}
557560
return a;
558561
}
562+
@Override
563+
public T latest() {
564+
int idx = index;
565+
if (idx > 0) {
566+
Object o = list.get(idx - 1);
567+
if (nl.isCompleted(o) || nl.isError(o)) {
568+
if (idx > 1) {
569+
return nl.getValue(list.get(idx - 2));
570+
}
571+
return null;
572+
}
573+
return nl.getValue(o);
574+
}
575+
return null;
576+
}
559577
}
560578

561579

@@ -715,6 +733,27 @@ public T[] toArray(T[] a) {
715733
}
716734
return list.toArray(a);
717735
}
736+
@Override
737+
public T latest() {
738+
Node<Object> h = head().next;
739+
if (h == null) {
740+
return null;
741+
}
742+
Node<Object> p = null;
743+
while (h != tail()) {
744+
p = h;
745+
h = h.next;
746+
}
747+
Object value = leaveTransform.call(h.value);
748+
if (nl.isError(value) || nl.isCompleted(value)) {
749+
if (p != null) {
750+
value = leaveTransform.call(p.value);
751+
return nl.getValue(value);
752+
}
753+
return null;
754+
}
755+
return nl.getValue(value);
756+
}
718757
}
719758

720759
// **************
@@ -781,6 +820,12 @@ I replayObserverFromIndexTest(
781820
* @return the array or a new array containing the current values
782821
*/
783822
T[] toArray(T[] a);
823+
/**
824+
* Returns the latest value that has been buffered or null if no such value
825+
* present.
826+
* @return the latest value buffered or null if none
827+
*/
828+
T latest();
784829
}
785830

786831
/** Interface to manage eviction checking. */
@@ -1054,6 +1099,7 @@ public void evictFinal(NodeList<Object> list) {
10541099
* @return true if the subject has received a throwable through {@code onError}.
10551100
*/
10561101
@Experimental
1102+
@Override
10571103
public boolean hasThrowable() {
10581104
NotificationLite<T> nl = ssm.nl;
10591105
Object o = ssm.get();
@@ -1064,6 +1110,7 @@ public boolean hasThrowable() {
10641110
* @return true if the subject completed normally via {@code onCompleted}
10651111
*/
10661112
@Experimental
1113+
@Override
10671114
public boolean hasCompleted() {
10681115
NotificationLite<T> nl = ssm.nl;
10691116
Object o = ssm.get();
@@ -1075,6 +1122,7 @@ public boolean hasCompleted() {
10751122
* subject hasn't terminated yet or it terminated normally.
10761123
*/
10771124
@Experimental
1125+
@Override
10781126
public Throwable getThrowable() {
10791127
NotificationLite<T> nl = ssm.nl;
10801128
Object o = ssm.get();
@@ -1098,15 +1146,10 @@ public int size() {
10981146
public boolean hasAnyValue() {
10991147
return !state.isEmpty();
11001148
}
1101-
/** An empty array to trigger getValues() to return a new array. */
1102-
private static final Object[] EMPTY_ARRAY = new Object[0];
1103-
/**
1104-
* @return returns a snapshot of the currently buffered non-terminal events.
1105-
*/
1106-
@SuppressWarnings("unchecked")
11071149
@Experimental
1108-
public Object[] getValues() {
1109-
return state.toArray((T[])EMPTY_ARRAY);
1150+
@Override
1151+
public boolean hasValue() {
1152+
return hasAnyValue();
11101153
}
11111154
/**
11121155
* Returns a snapshot of the currently buffered non-terminal events into
@@ -1115,7 +1158,12 @@ public Object[] getValues() {
11151158
* @return the array {@code a} if it had enough capacity or a new array containing the available values
11161159
*/
11171160
@Experimental
1161+
@Override
11181162
public T[] getValues(T[] a) {
11191163
return state.toArray(a);
11201164
}
1165+
@Override
1166+
public T getValue() {
1167+
return state.latest();
1168+
}
11211169
}

src/main/java/rx/subjects/SerializedSubject.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import rx.Subscriber;
19+
import rx.annotations.Experimental;
1920
import rx.observers.SerializedObserver;
2021

2122
/**
@@ -68,4 +69,39 @@ public void onNext(T t) {
6869
public boolean hasObservers() {
6970
return actual.hasObservers();
7071
}
72+
@Override
73+
@Experimental
74+
public boolean hasCompleted() {
75+
return actual.hasCompleted();
76+
}
77+
@Override
78+
@Experimental
79+
public boolean hasThrowable() {
80+
return actual.hasThrowable();
81+
}
82+
@Override
83+
@Experimental
84+
public boolean hasValue() {
85+
return actual.hasValue();
86+
}
87+
@Override
88+
@Experimental
89+
public Throwable getThrowable() {
90+
return actual.getThrowable();
91+
}
92+
@Override
93+
@Experimental
94+
public T getValue() {
95+
return actual.getValue();
96+
}
97+
@Override
98+
@Experimental
99+
public Object[] getValues() {
100+
return actual.getValues();
101+
}
102+
@Override
103+
@Experimental
104+
public T[] getValues(T[] a) {
105+
return actual.getValues(a);
106+
}
71107
}

0 commit comments

Comments
 (0)