Skip to content

Commit fa9ff95

Browse files
committed
Add groupJoin to RxScala
1 parent 1220de7 commit fa9ff95

File tree

3 files changed

+52
-0
lines changed

3 files changed

+52
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,4 +1275,24 @@ class RxScalaDemo extends JUnitSuite {
12751275
}
12761276
o.toBlocking.foreach(println)
12771277
}
1278+
1279+
@Test def joinExample() {
1280+
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
1281+
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1282+
val o = o1.join(o2,
1283+
(_: String) => Observable.timer(300 millis),
1284+
(_: String) => Observable.timer(200 millis),
1285+
(t1: String, t2: String) => (t1, t2))
1286+
o.take(10).toBlocking.foreach(println)
1287+
}
1288+
1289+
@Test def groupJoinExample() {
1290+
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
1291+
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1292+
val o = o1.groupJoin(o2,
1293+
(_: String) => Observable.timer(300 millis),
1294+
(_: String) => Observable.timer(200 millis),
1295+
(t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
1296+
o.take(3).toBlocking.foreach(println)
1297+
}
12781298
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2266,6 +2266,36 @@ trait Observable[+T]
22662266
)
22672267
}
22682268

2269+
/**
2270+
* Returns an Observable that correlates two Observables when they overlap in time and groups the results.
2271+
*
2272+
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupJoin.png">
2273+
*
2274+
* @param other the other Observable to correlate items from the source Observable with
2275+
* @param leftDuration a function that returns an Observable whose emissions indicate the duration of the values of
2276+
* the source Observable
2277+
* @param rightDuration a function that returns an Observable whose emissions indicate the duration of the values of
2278+
* the `other` Observable
2279+
* @param resultSelector a function that takes an item emitted by each Observable and returns the value to be emitted
2280+
* by the resulting Observable
2281+
* @return an Observable that emits items based on combining those items emitted by the source Observables
2282+
* whose durations overlap
2283+
*/
2284+
def groupJoin[S, R](other: Observable[S], leftDuration: T => Observable[Any], rightDuration: S => Observable[Any], resultSelector: (T, Observable[S]) => R): Observable[R] = {
2285+
val outer: rx.Observable[_ <: T] = this.asJavaObservable
2286+
val inner: rx.Observable[_ <: S] = other.asJavaObservable
2287+
val left: Func1[_ >: T, _ <: rx.Observable[_ <: Any]] = (t: T) => leftDuration(t).asJavaObservable
2288+
val right: Func1[_ >: S, _ <: rx.Observable[_ <: Any]] = (s: S) => rightDuration(s).asJavaObservable
2289+
val f: Func2[_ >: T, _ >: rx.Observable[S], _ <: R] = (t: T, o: rx.Observable[S]) => resultSelector(t, toScalaObservable[S](o))
2290+
toScalaObservable[R](
2291+
outer.asInstanceOf[rx.Observable[T]].groupJoin[S, Any, Any, R](
2292+
inner.asInstanceOf[rx.Observable[S]],
2293+
left.asInstanceOf[Func1[T, rx.Observable[Any]]],
2294+
right.asInstanceOf[Func1[S, rx.Observable[Any]]],
2295+
f)
2296+
)
2297+
}
2298+
22692299
/**
22702300
* Returns a new Observable by applying a function that you supply to each item emitted by the source
22712301
* Observable that returns an Observable, and then emitting the items emitted by the most recently emitted

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ class CompletenessTest extends JUnitSuite {
9797
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
9898
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
9999
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V, (K, Observable[V]) => Observable[Any])",
100+
"groupJoin(Observable[T2], Func1[_ >: T, _ <: Observable[D1]], Func1[_ >: T2, _ <: Observable[D2]], Func2[_ >: T, _ >: Observable[T2], _ <: R])" -> "groupJoin(Observable[S], T => Observable[Any], S => Observable[Any], (T, Observable[S]) => R)",
100101
"ignoreElements()" -> "[use `filter(_ => false)`]",
102+
"join(Observable[TRight], Func1[T, Observable[TLeftDuration]], Func1[TRight, Observable[TRightDuration]], Func2[T, TRight, R])" -> "join(Observable[S], T => Observable[Any], S => Observable[Any], (T, S) => R)",
101103
"last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]",
102104
"lastOrDefault(T)" -> "lastOrElse(=> U)",
103105
"lastOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).lastOrElse(default)`]",

0 commit comments

Comments
 (0)