Skip to content

Commit 6f85e54

Browse files
committed
Add combineLatest variant to RxScala
1 parent 8aae497 commit 6f85e54

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,17 @@ class RxScalaDemo extends JUnitSuite {
304304
waitFor(combinedCounter)
305305
}
306306

307+
@Test def combineLatestExample2() {
308+
val firstCounter = Observable.interval(250 millis)
309+
val secondCounter = Observable.interval(550 millis)
310+
val thirdCounter = Observable.interval(850 millis)
311+
val sources = Seq(firstCounter, secondCounter, thirdCounter)
312+
val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10)
313+
314+
combinedCounter subscribe {x => println(s"Emitted group: $x")}
315+
waitFor(combinedCounter)
316+
}
317+
307318
@Test def olympicsExampleWithoutPublish() {
308319
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
309320
medals.subscribe(println(_)) // triggers an execution of medals Observable

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4185,6 +4185,26 @@ object Observable {
41854185
def amb[T](sources: Observable[T]*): Observable[T] = {
41864186
toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava))
41874187
}
4188+
4189+
/**
4190+
* Combines a list of source Observables by emitting an item that aggregates the latest values of each of
4191+
* the source Observables each time an item is received from any of the source Observables, where this
4192+
* aggregation is defined by a specified function.
4193+
*
4194+
* @tparam T the common base type of source values
4195+
* @tparam R the result type
4196+
* @param sources the list of source Observables
4197+
* @param combineFunction the aggregation function used to combine the items emitted by the source Observables
4198+
* @return an Observable that emits items that are the result of combining the items emitted by the source
4199+
* Observables by means of the given aggregation function
4200+
*/
4201+
def combineLatest[T, R](sources: Seq[Observable[T]], combineFunction: Seq[T] => R): Observable[R] = {
4202+
val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava)
4203+
val jCombineFunction = new rx.functions.FuncN[R] {
4204+
override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
4205+
}
4206+
toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
4207+
}
41884208
}
41894209

41904210

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class CompletenessTest extends JUnitSuite {
184184
"create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)",
185185
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
186186
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
187+
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]], Seq[T] => R)",
187188
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
188189
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
189190
"from(Array[T])" -> "[use `items(T*)`]",

0 commit comments

Comments
 (0)