Skip to content

Commit ba9331f

Browse files
committed
Add pivot to RxScala
1 parent fa9ff95 commit ba9331f

File tree

3 files changed

+87
-0
lines changed

3 files changed

+87
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,4 +1295,33 @@ class RxScalaDemo extends JUnitSuite {
12951295
(t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
12961296
o.take(3).toBlocking.foreach(println)
12971297
}
1298+
1299+
@Test def pivotExample() {
1300+
val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map {
1301+
case (t: String, o: Observable[Int]) => (t, o.groupBy(i => if (i % 2 == 0) true else false))
1302+
}
1303+
println("o1:")
1304+
o1.subscribe {
1305+
n => n match {
1306+
case (k1: String, o: Observable[(Boolean, Observable[Int])]) =>
1307+
o.subscribe {
1308+
m => m match {
1309+
case (k2: Boolean, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v"))
1310+
}
1311+
}
1312+
}
1313+
}
1314+
val o2 = o1.pivot
1315+
println("o2:")
1316+
o2.subscribe {
1317+
n => n match {
1318+
case (k1: Boolean, o: Observable[(String, Observable[Int])]) =>
1319+
o.subscribe {
1320+
m => m match {
1321+
case (k2: String, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v"))
1322+
}
1323+
}
1324+
}
1325+
}
1326+
}
12981327
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3813,6 +3813,63 @@ trait Observable[+T]
38133813
def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = {
38143814
asJavaObservable.subscribe(onNext, onError, onComplete)
38153815
}
3816+
3817+
/**
3818+
* Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
3819+
* and and the set on which their items are grouped.
3820+
* <p>
3821+
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
3822+
*
3823+
* For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
3824+
* <ul>
3825+
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
3826+
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
3827+
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
3828+
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
3829+
* </ul>
3830+
* is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`:
3831+
*
3832+
* <ul>
3833+
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
3834+
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
3835+
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
3836+
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
3837+
* </ul>
3838+
* <p>
3839+
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
3840+
* <p>
3841+
* <em>Note:</em> A `(K, Observable[_])` will cache the items it is to emit until such time as it
3842+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
3843+
* `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
3844+
* discard their buffers by applying an operator like `take(0)` to them.
3845+
*
3846+
* @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
3847+
* inner-outer keys.
3848+
*/
3849+
def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = {
3850+
import rx.observables.{GroupedObservable => JGroupedObservable}
3851+
val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() {
3852+
override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = {
3853+
val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() {
3854+
override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = {
3855+
JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]])
3856+
}
3857+
})
3858+
JGroupedObservable.from(t1._1, jo)
3859+
}
3860+
}
3861+
val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this
3862+
val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1)))
3863+
o2.map {
3864+
(jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => {
3865+
val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() {
3866+
override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2))
3867+
})
3868+
(jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo))
3869+
}
3870+
}
3871+
}
3872+
38163873
}
38173874

38183875
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ class CompletenessTest extends JUnitSuite {
219219
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
220220
"parallelMerge(Observable[Observable[T]], Int)" -> "parallelMerge(Int)(<:<[Observable[T], Observable[Observable[U]]])",
221221
"parallelMerge(Observable[Observable[T]], Int, Scheduler)" -> "parallelMerge(Int, Scheduler)(<:<[Observable[T], Observable[Observable[U]]])",
222+
"pivot(Observable[GroupedObservable[K1, GroupedObservable[K2, T]]])" -> "pivot(<:<[Observable[T], Observable[(K1, Observable[(K2, Observable[U])])]])",
222223
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])",
223224
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqual(Observable[U], (U, U) => Boolean)",
224225
"range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]",

0 commit comments

Comments
 (0)