Skip to content

Commit 1220de7

Browse files
committed
Add groupByUntil variant to RxScala
1 parent d4d43a6 commit 1220de7

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,13 @@ class RxScalaDemo extends JUnitSuite {
294294
sequenced.subscribe(x => println(s"Emitted group: $x"))
295295
}
296296

297+
@Test def groupByUntilExample2() {
298+
val numbers = Observable.interval(250 millis).take(14)
299+
val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)})
300+
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
301+
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
302+
}
303+
297304
@Test def combineLatestExample() {
298305
val firstCounter = Observable.interval(250 millis)
299306
val secondCounter = Observable.interval(550 millis)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,6 +2193,35 @@ trait Observable[+T]
21932193
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
21942194
}
21952195

2196+
/**
2197+
* Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function
2198+
* until the duration Observable expires for the key.
2199+
*
2200+
* <img width="640" height="375" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupByUntil.png">
2201+
*
2202+
* <em>Note:</em> The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it
2203+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that
2204+
* do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them.
2205+
*
2206+
* @param keySelector a function to extract the key for each item
2207+
* @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one
2208+
* of the resulting `Observable[V]`s
2209+
* @param closings a function to signal the expiration of a group
2210+
* @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key
2211+
* value and each of which emits all items emitted by the source [[Observable]] during that
2212+
* key's duration that share that same key value, transformed by the value selector
2213+
*/
2214+
def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V, closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = {
2215+
val jKeySelector: Func1[_ >: T, _ <: K] = keySelector
2216+
val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector
2217+
val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] {
2218+
override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo))
2219+
}
2220+
val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o))
2221+
val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f)
2222+
toScalaObservable[(K, Observable[V])](jo)
2223+
}
2224+
21962225
/**
21972226
* Correlates the items emitted by two Observables based on overlapping durations.
21982227
* <p>

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class CompletenessTest extends JUnitSuite {
9696
"forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)",
9797
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
9898
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
99+
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V, (K, Observable[V]) => Observable[Any])",
99100
"ignoreElements()" -> "[use `filter(_ => false)`]",
100101
"last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]",
101102
"lastOrDefault(T)" -> "lastOrElse(=> U)",

0 commit comments

Comments
 (0)