Skip to content

Commit e449fb1

Browse files
author
Joachim Hofer
committed
Fixed javadoc and comments, cleaned up a bit, and tried to fix
synchronization.
1 parent 1fabd42 commit e449fb1

File tree

1 file changed

+82
-57
lines changed

1 file changed

+82
-57
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 82 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,24 @@
4343

4444
public class OperationCombineLatest {
4545

46+
/**
47+
* Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables
48+
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
49+
* @param w0 The first source observable.
50+
* @param w1 The second source observable.
51+
* @param combineLatestFunction The aggregation function used to combine the source observable values.
52+
* @return A function from an observer to a subscription. This can be used to create an observable from.
53+
*/
4654
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
4755
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
4856
a.addObserver(new CombineObserver<R, T0>(a, w0));
4957
a.addObserver(new CombineObserver<R, T1>(a, w1));
5058
return a;
5159
}
5260

61+
/**
62+
* @see #combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction)
63+
*/
5364
public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineLatestFunction) {
5465
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
5566
a.addObserver(new CombineObserver<R, T0>(a, w0));
@@ -58,6 +69,9 @@ public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Obs
5869
return a;
5970
}
6071

72+
/**
73+
* @see #combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction)
74+
*/
6175
public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineLatestFunction) {
6276
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
6377
a.addObserver(new CombineObserver<R, T0>(a, w0));
@@ -91,7 +105,7 @@ public void onCompleted() {
91105

92106
@Override
93107
public void onError(Exception e) {
94-
a.error(this, e);
108+
a.error(e);
95109
}
96110

97111
@Override
@@ -101,32 +115,46 @@ public void onNext(T args) {
101115
}
102116

103117
/**
104-
* Receive notifications from each of the Observables we are reducing and execute the combineLatestFunction whenever we have received events from all Observables.
105-
*
106-
* @param <R>
118+
* Receive notifications from each of the observables we are reducing and execute the combineLatestFunction
119+
* whenever we have received an event from one of the observables, as soon as each Observable has received
120+
* at least one event.
107121
*/
108122
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
109123

124+
private Observer<R> observer;
125+
110126
private final FuncN<R> combineLatestFunction;
111-
private Observer<R> Observer;
112-
private AtomicBoolean running = new AtomicBoolean(true);
127+
private final AtomicBoolean running = new AtomicBoolean(true);
113128

129+
// used as an internal lock for handling the latest values and the completed state of each observer
130+
private final Object lockObject = new Object();
131+
114132
/**
115-
* store when a Observer completes
133+
* Store when an observer completes.
116134
* <p>
117-
* Note that access to this set MUST BE SYNCHRONIZED
135+
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
118136
* */
119-
private Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
137+
private final Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
120138

121139
/**
122-
* The last value from a Observer
140+
* The latest value from each observer
123141
* <p>
124-
* Note that access to this set MUST BE SYNCHRONIZED
142+
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
125143
* */
126-
private Map<CombineObserver<R, ?>, Object> lastValue = new HashMap<CombineObserver<R, ?>, Object>();
144+
private final Map<CombineObserver<R, ?>, Object> latestValue = new HashMap<CombineObserver<R, ?>, Object>();
127145

128-
private Set<CombineObserver<R, ?>> hasLastValue = new HashSet<CombineObserver<R, ?>>();
129-
private List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();
146+
/**
147+
* Whether each observer has a latest value at all.
148+
* <p>
149+
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
150+
* */
151+
private final Set<CombineObserver<R, ?>> hasLatestValue = new HashSet<CombineObserver<R, ?>>();
152+
153+
/**
154+
* Ordered list of observers to combine.
155+
* No synchronization is necessary as these can not be added or changed asynchronously.
156+
*/
157+
private final List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();
130158

131159
public Aggregator(FuncN<R> combineLatestFunction) {
132160
this.combineLatestFunction = combineLatestFunction;
@@ -135,55 +163,53 @@ public Aggregator(FuncN<R> combineLatestFunction) {
135163
/**
136164
* Receive notification of a Observer starting (meaning we should require it for aggregation)
137165
*
138-
* @param w
166+
* @param w The observer to add.
139167
*/
140-
synchronized <T> void addObserver(CombineObserver<R, T> w) {
141-
observers.add(w);
168+
<T> void addObserver(CombineObserver<R, T> w) {
169+
observers.add(w);
142170
}
143171

144172
/**
145173
* Receive notification of a Observer completing its iterations.
146174
*
147-
* @param w
175+
* @param w The observer that has completed.
148176
*/
149-
synchronized <T> void complete(CombineObserver<R, T> w) {
150-
// store that this CombineLatestObserver is completed
151-
completed.add(w);
152-
// if all CombineObservers are completed, we mark the whole thing as completed
153-
if (completed.size() == observers.size()) {
154-
if (running.get()) {
155-
// mark ourselves as done
156-
Observer.onCompleted();
157-
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
158-
running.set(false);
177+
<T> void complete(CombineObserver<R, T> w) {
178+
synchronized(lockObject) {
179+
// store that this CombineLatestObserver is completed
180+
completed.add(w);
181+
// if all CombineObservers are completed, we mark the whole thing as completed
182+
if (completed.size() == observers.size()) {
183+
if (running.get()) {
184+
// mark ourselves as done
185+
observer.onCompleted();
186+
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
187+
running.set(false);
188+
}
159189
}
160190
}
161191
}
162192

163193
/**
164194
* Receive error for a Observer. Throw the error up the chain and stop processing.
165-
*
166-
* @param w
167195
*/
168-
synchronized <T> void error(CombineObserver<R, T> w, Exception e) {
169-
Observer.onError(e);
170-
/* tell ourselves to stop processing onNext events, event if the Observers don't obey the unsubscribe we're about to send */
171-
running.set(false);
172-
/* tell all Observers to unsubscribe since we had an error */
196+
void error(Exception e) {
197+
observer.onError(e);
198+
/* tell all observers to unsubscribe since we had an error */
173199
stop();
174200
}
175201

176202
/**
177-
* Receive the next value from a Observer.
203+
* Receive the next value from an observer.
178204
* <p>
179-
* If we have received values from all Observers, trigger the combineLatest function, otherwise store the value and keep waiting.
205+
* If we have received values from all observers, trigger the combineLatest function, otherwise store the value and keep waiting.
180206
*
181207
* @param w
182208
* @param arg
183209
*/
184210
<T> void next(CombineObserver<R, T> w, T arg) {
185-
if (Observer == null) {
186-
throw new RuntimeException("This shouldn't be running if a Observer isn't registered");
211+
if (observer == null) {
212+
throw new RuntimeException("This shouldn't be running if an Observer isn't registered");
187213
}
188214

189215
/* if we've been 'unsubscribed' don't process anything further even if the things we're watching keep sending (likely because they are not responding to the unsubscribe call) */
@@ -194,64 +220,63 @@ <T> void next(CombineObserver<R, T> w, T arg) {
194220
// define here so the variable is out of the synchronized scope
195221
Object[] argsToCombineLatest = new Object[observers.size()];
196222

197-
// we synchronize everything that touches receivedValues and the internal LinkedList objects
198-
synchronized (this) {
199-
// remember this as the last value for this Observer
200-
lastValue.put(w, arg);
201-
hasLastValue.add(w);
223+
// we synchronize everything that touches latest values
224+
synchronized (lockObject) {
225+
// remember this as the latest value for this observer
226+
latestValue.put(w, arg);
227+
228+
// remember that this observer now has a latest value set
229+
hasLatestValue.add(w);
202230

203-
// if all CombineLatestObservers in 'receivedValues' map have a value, invoke the combineLatestFunction
231+
// if all observers in the 'observers' list have a value, invoke the combineLatestFunction
204232
for (CombineObserver<R, ?> rw : observers) {
205-
if (!hasLastValue.contains(rw)) {
233+
if (!hasLatestValue.contains(rw)) {
206234
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
207235
return;
208236
}
209237
}
210238
// if we get to here this means all the queues have data
211239
int i = 0;
212240
for (CombineObserver<R, ?> _w : observers) {
213-
argsToCombineLatest[i++] = lastValue.get(_w);
241+
argsToCombineLatest[i++] = latestValue.get(_w);
214242
}
215243
}
216244
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
217245
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
218246
// this 'next' method while another thread finishes calling this combineLatestFunction
219-
Observer.onNext(combineLatestFunction.call(argsToCombineLatest));
247+
observer.onNext(combineLatestFunction.call(argsToCombineLatest));
220248
}
221249

222250
@Override
223-
public Subscription call(Observer<R> Observer) {
224-
if (this.Observer != null) {
251+
public Subscription call(Observer<R> observer) {
252+
if (this.observer != null) {
225253
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
226254
}
227-
this.Observer = Observer;
255+
this.observer = observer;
228256

229-
/* start the Observers */
257+
/* start the observers */
230258
for (CombineObserver<R, ?> rw : observers) {
231259
rw.startWatching();
232260
}
233261

234262
return new Subscription() {
235-
236263
@Override
237264
public void unsubscribe() {
238265
stop();
239266
}
240-
241267
};
242268
}
243269

244270
private void stop() {
245271
/* tell ourselves to stop processing onNext events */
246272
running.set(false);
247-
/* propogate to all Observers to unsubscribe */
273+
/* propogate to all observers to unsubscribe */
248274
for (CombineObserver<R, ?> rw : observers) {
249275
if (rw.subscription != null) {
250276
rw.subscription.unsubscribe();
251277
}
252278
}
253279
}
254-
255280
}
256281

257282
public static class UnitTest {
@@ -597,7 +622,7 @@ public void testAggregatorError() {
597622
verify(aObserver, never()).onCompleted();
598623
verify(aObserver, times(1)).onNext("helloworld");
599624

600-
a.error(r1, new RuntimeException(""));
625+
a.error(new RuntimeException(""));
601626
a.next(r1, "hello");
602627
a.next(r2, "again");
603628

0 commit comments

Comments
 (0)