Skip to content

Commit 057db88

Browse files
Merge pull request #1316 from zsxwing/rxscala-more
Add the rest operators to RxScala
2 parents 022ae66 + c5605bc commit 057db88

File tree

4 files changed

+681
-2
lines changed

4 files changed

+681
-2
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import java.io.IOException
1919
import java.util.concurrent.CountDownLatch
2020
import java.util.concurrent.TimeUnit
2121

22+
import scala.collection.mutable
2223
import scala.concurrent.duration.Duration
2324
import scala.concurrent.duration.DurationInt
2425
import scala.concurrent.duration.DurationLong
@@ -47,6 +48,29 @@ import rx.lang.scala.schedulers._
4748
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
4849
class RxScalaDemo extends JUnitSuite {
4950

51+
@Test def subscribeExample() {
52+
val o = Observable.items(1, 2, 3)
53+
54+
// Generally, we have two methods, `subscribe` and `foreach`, to listen to the messages from an Observable.
55+
// `foreach` is just an alias to `subscribe`.
56+
o.subscribe(
57+
n => println(n),
58+
e => e.printStackTrace(),
59+
() => println("done")
60+
)
61+
62+
o.foreach(
63+
n => println(n),
64+
e => e.printStackTrace(),
65+
() => println("done")
66+
)
67+
68+
// For-comprehension is also an alternative, if you are only interested in `onNext`
69+
for (i <- o) {
70+
println(i)
71+
}
72+
}
73+
5074
@Test def intervalExample() {
5175
val o = Observable.interval(200 millis).take(5)
5276
o.subscribe(n => println("n = " + n))
@@ -130,7 +154,7 @@ class RxScalaDemo extends JUnitSuite {
130154
o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
131155
}
132156

133-
@Test def fattenSomeExample() {
157+
@Test def flattenSomeExample() {
134158
// To merge some observables which are all known already:
135159
List(
136160
Observable.interval(200 millis),
@@ -139,6 +163,24 @@ class RxScalaDemo extends JUnitSuite {
139163
).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_))
140164
}
141165

166+
@Test def flattenExample() {
167+
List(
168+
Observable.interval(200 millis).map(_ => 1).take(5),
169+
Observable.interval(200 millis).map(_ => 2).take(5),
170+
Observable.interval(200 millis).map(_ => 3).take(5),
171+
Observable.interval(200 millis).map(_ => 4).take(5)
172+
).toObservable.flatten.toBlocking.foreach(println(_))
173+
}
174+
175+
@Test def flattenExample2() {
176+
List(
177+
Observable.interval(200 millis).map(_ => 1).take(5),
178+
Observable.interval(200 millis).map(_ => 2).take(5),
179+
Observable.interval(200 millis).map(_ => 3).take(5),
180+
Observable.interval(200 millis).map(_ => 4).take(5)
181+
).toObservable.flatten(2).toBlocking.foreach(println(_))
182+
}
183+
142184
@Test def rangeAndBufferExample() {
143185
val o = Observable.from(1 to 18)
144186
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
@@ -276,6 +318,13 @@ class RxScalaDemo extends JUnitSuite {
276318
sequenced.subscribe(x => println(s"Emitted group: $x"))
277319
}
278320

321+
@Test def groupByUntilExample2() {
322+
val numbers = Observable.interval(250 millis).take(14)
323+
val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)})
324+
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
325+
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
326+
}
327+
279328
@Test def combineLatestExample() {
280329
val firstCounter = Observable.interval(250 millis)
281330
val secondCounter = Observable.interval(550 millis)
@@ -286,6 +335,17 @@ class RxScalaDemo extends JUnitSuite {
286335
waitFor(combinedCounter)
287336
}
288337

338+
@Test def combineLatestExample2() {
339+
val firstCounter = Observable.interval(250 millis)
340+
val secondCounter = Observable.interval(550 millis)
341+
val thirdCounter = Observable.interval(850 millis)
342+
val sources = Seq(firstCounter, secondCounter, thirdCounter)
343+
val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10)
344+
345+
combinedCounter subscribe {x => println(s"Emitted group: $x")}
346+
waitFor(combinedCounter)
347+
}
348+
289349
@Test def olympicsExampleWithoutPublish() {
290350
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
291351
medals.subscribe(println(_)) // triggers an execution of medals Observable
@@ -829,6 +889,40 @@ class RxScalaDemo extends JUnitSuite {
829889
println(m.toBlockingObservable.single)
830890
}
831891

892+
@Test def toMultimapExample1(): Unit = {
893+
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
894+
val keySelector = (s: String) => s.head
895+
val m = o.toMultimap(keySelector)
896+
println(m.toBlocking.single)
897+
}
898+
899+
@Test def toMultimapExample2(): Unit = {
900+
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
901+
val keySelector = (s: String) => s.head
902+
val valueSelector = (s: String) => s.tail
903+
val m = o.toMultimap(keySelector, valueSelector)
904+
println(m.toBlocking.single)
905+
}
906+
907+
@Test def toMultimapExample3(): Unit = {
908+
val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
909+
val keySelector = (s: String) => s.head
910+
val valueSelector = (s: String) => s.tail
911+
val mapFactory = () => mutable.Map('d' -> mutable.Buffer("oug"))
912+
val m = o.toMultimap(keySelector, valueSelector, mapFactory)
913+
println(m.toBlocking.single.mapValues(_.toList))
914+
}
915+
916+
@Test def toMultimapExample4(): Unit = {
917+
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
918+
val keySelector = (s: String) => s.head
919+
val valueSelector = (s: String) => s.tail
920+
val mapFactory = () => mutable.Map('d' -> mutable.ListBuffer("oug"))
921+
val bufferFactory = (k: Char) => mutable.ListBuffer[String]()
922+
val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory)
923+
println(m.toBlocking.single)
924+
}
925+
832926
@Test def containsExample(): Unit = {
833927
val o1 = List(1, 2, 3).toObservable.contains(2)
834928
assertTrue(o1.toBlockingObservable.single)
@@ -857,6 +951,28 @@ class RxScalaDemo extends JUnitSuite {
857951
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
858952
}
859953

954+
@Test def retryExample3(): Unit = {
955+
var isFirst = true
956+
val o = Observable {
957+
(subscriber: Subscriber[String]) =>
958+
if (isFirst) {
959+
subscriber.onNext("alice")
960+
subscriber.onError(new IOException("Oops"))
961+
isFirst = false
962+
}
963+
else {
964+
subscriber.onNext("bob")
965+
subscriber.onError(new RuntimeException("Oops"))
966+
}
967+
}
968+
o.retry {
969+
(times, e) => e match {
970+
case e: IOException => times <= 3
971+
case _ => false
972+
}
973+
}.subscribe(s => println(s), e => e.printStackTrace())
974+
}
975+
860976
@Test def liftExample1(): Unit = {
861977
// Add "No. " in front of each item
862978
val o = List(1, 2, 3).toObservable.lift {
@@ -1176,4 +1292,82 @@ class RxScalaDemo extends JUnitSuite {
11761292
.take(20)
11771293
.toBlocking.foreach(println)
11781294
}
1295+
1296+
@Test def onErrorResumeNextExample() {
1297+
val o = Observable {
1298+
(subscriber: Subscriber[Int]) =>
1299+
subscriber.onNext(1)
1300+
subscriber.onNext(2)
1301+
subscriber.onError(new IOException("Oops"))
1302+
subscriber.onNext(3)
1303+
subscriber.onNext(4)
1304+
}
1305+
o.onErrorResumeNext(_ => Observable.items(10, 11, 12)).subscribe(println(_))
1306+
}
1307+
1308+
@Test def onErrorFlatMapExample() {
1309+
val o = Observable {
1310+
(subscriber: Subscriber[Int]) =>
1311+
subscriber.onNext(1)
1312+
subscriber.onNext(2)
1313+
subscriber.onError(new IOException("Oops"))
1314+
subscriber.onNext(3)
1315+
subscriber.onNext(4)
1316+
}
1317+
o.onErrorFlatMap((_, _) => Observable.items(10, 11, 12)).subscribe(println(_))
1318+
}
1319+
1320+
@Test def onErrorFlatMapExample2() {
1321+
val o = Observable.items(4, 2, 0).map(16 / _).onErrorFlatMap {
1322+
(e, op) => op match {
1323+
case Some(v) if v == 0 => Observable.items(Int.MinValue)
1324+
case _ => Observable.empty
1325+
}
1326+
}
1327+
o.subscribe(println(_))
1328+
}
1329+
1330+
@Test def switchMapExample() {
1331+
val o = Observable.interval(300 millis).take(5).switchMap[String] {
1332+
n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}")
1333+
}
1334+
o.toBlocking.foreach(println)
1335+
}
1336+
1337+
@Test def joinExample() {
1338+
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
1339+
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1340+
val o = o1.join(o2,
1341+
(_: String) => Observable.timer(300 millis),
1342+
(_: String) => Observable.timer(200 millis),
1343+
(t1: String, t2: String) => (t1, t2))
1344+
o.take(10).toBlocking.foreach(println)
1345+
}
1346+
1347+
@Test def groupJoinExample() {
1348+
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
1349+
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
1350+
val o = o1.groupJoin(o2,
1351+
(_: String) => Observable.timer(300 millis),
1352+
(_: String) => Observable.timer(200 millis),
1353+
(t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
1354+
o.take(3).toBlocking.foreach(println)
1355+
}
1356+
1357+
@Test def pivotExample() {
1358+
val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map {
1359+
case (t: String, o: Observable[Int]) => (t, o.groupBy(i => i % 2 == 0))
1360+
}
1361+
println("o1:")
1362+
(for ((k1, o) <- o1;
1363+
(k2, vs) <- o;
1364+
v <- vs
1365+
) yield (k1, k2, v)).subscribe(println(_))
1366+
val o2 = o1.pivot
1367+
println("o2:")
1368+
(for ((k1, o) <- o2;
1369+
(k2, vs) <- o;
1370+
v <- vs
1371+
) yield (k1, k2, v)).subscribe(println(_))
1372+
}
11791373
}

0 commit comments

Comments
 (0)