@@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch
20
20
import java .util .concurrent .TimeUnit
21
21
22
22
import scala .concurrent .Await
23
+ import scala .collection .mutable
23
24
import scala .concurrent .duration .Duration
24
25
import scala .concurrent .duration .DurationInt
25
26
import scala .concurrent .duration .DurationLong
@@ -48,6 +49,29 @@ import rx.lang.scala.schedulers._
48
49
@ Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
49
50
class RxScalaDemo extends JUnitSuite {
50
51
52
+ @ Test def subscribeExample () {
53
+ val o = Observable .items(1 , 2 , 3 )
54
+
55
+ // Generally, we have two methods, `subscribe` and `foreach`, to listen to the messages from an Observable.
56
+ // `foreach` is just an alias to `subscribe`.
57
+ o.subscribe(
58
+ n => println(n),
59
+ e => e.printStackTrace(),
60
+ () => println(" done" )
61
+ )
62
+
63
+ o.foreach(
64
+ n => println(n),
65
+ e => e.printStackTrace(),
66
+ () => println(" done" )
67
+ )
68
+
69
+ // For-comprehension is also an alternative, if you are only interested in `onNext`
70
+ for (i <- o) {
71
+ println(i)
72
+ }
73
+ }
74
+
51
75
@ Test def intervalExample () {
52
76
val o = Observable .interval(200 millis).take(5 )
53
77
o.subscribe(n => println(" n = " + n))
@@ -131,7 +155,7 @@ class RxScalaDemo extends JUnitSuite {
131
155
o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
132
156
}
133
157
134
- @ Test def fattenSomeExample () {
158
+ @ Test def flattenSomeExample () {
135
159
// To merge some observables which are all known already:
136
160
List (
137
161
Observable .interval(200 millis),
@@ -140,6 +164,24 @@ class RxScalaDemo extends JUnitSuite {
140
164
).toObservable.flatten.take(12 ).toBlockingObservable.foreach(println(_))
141
165
}
142
166
167
+ @ Test def flattenExample () {
168
+ List (
169
+ Observable .interval(200 millis).map(_ => 1 ).take(5 ),
170
+ Observable .interval(200 millis).map(_ => 2 ).take(5 ),
171
+ Observable .interval(200 millis).map(_ => 3 ).take(5 ),
172
+ Observable .interval(200 millis).map(_ => 4 ).take(5 )
173
+ ).toObservable.flatten.toBlocking.foreach(println(_))
174
+ }
175
+
176
+ @ Test def flattenExample2 () {
177
+ List (
178
+ Observable .interval(200 millis).map(_ => 1 ).take(5 ),
179
+ Observable .interval(200 millis).map(_ => 2 ).take(5 ),
180
+ Observable .interval(200 millis).map(_ => 3 ).take(5 ),
181
+ Observable .interval(200 millis).map(_ => 4 ).take(5 )
182
+ ).toObservable.flatten(2 ).toBlocking.foreach(println(_))
183
+ }
184
+
143
185
@ Test def rangeAndBufferExample () {
144
186
val o = Observable .from(1 to 18 )
145
187
o.buffer(5 ).subscribe((l : Seq [Int ]) => println(l.mkString(" [" , " , " , " ]" )))
@@ -299,6 +341,13 @@ class RxScalaDemo extends JUnitSuite {
299
341
sequenced.subscribe(x => println(s " Emitted group: $x" ))
300
342
}
301
343
344
+ @ Test def groupByUntilExample2 () {
345
+ val numbers = Observable .interval(250 millis).take(14 )
346
+ val grouped = numbers.groupByUntil[Long , Long ](x => x % 2 , x => x * 10 , {case (key, obs) => Observable .interval(2 seconds)})
347
+ val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
348
+ sequenced.toBlocking.foreach(x => println(s " Emitted group: $x" ))
349
+ }
350
+
302
351
@ Test def combineLatestExample () {
303
352
val firstCounter = Observable .interval(250 millis)
304
353
val secondCounter = Observable .interval(550 millis)
@@ -309,6 +358,17 @@ class RxScalaDemo extends JUnitSuite {
309
358
waitFor(combinedCounter)
310
359
}
311
360
361
+ @ Test def combineLatestExample2 () {
362
+ val firstCounter = Observable .interval(250 millis)
363
+ val secondCounter = Observable .interval(550 millis)
364
+ val thirdCounter = Observable .interval(850 millis)
365
+ val sources = Seq (firstCounter, secondCounter, thirdCounter)
366
+ val combinedCounter = Observable .combineLatest(sources, (items : Seq [Long ]) => items.toList).take(10 )
367
+
368
+ combinedCounter subscribe {x => println(s " Emitted group: $x" )}
369
+ waitFor(combinedCounter)
370
+ }
371
+
312
372
@ Test def olympicsExampleWithoutPublish () {
313
373
val medals = Olympics .mountainBikeMedals.doOnEach(_ => println(" onNext" ))
314
374
medals.subscribe(println(_)) // triggers an execution of medals Observable
@@ -852,6 +912,40 @@ class RxScalaDemo extends JUnitSuite {
852
912
println(m.toBlockingObservable.single)
853
913
}
854
914
915
+ @ Test def toMultimapExample1 (): Unit = {
916
+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
917
+ val keySelector = (s : String ) => s.head
918
+ val m = o.toMultimap(keySelector)
919
+ println(m.toBlocking.single)
920
+ }
921
+
922
+ @ Test def toMultimapExample2 (): Unit = {
923
+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
924
+ val keySelector = (s : String ) => s.head
925
+ val valueSelector = (s : String ) => s.tail
926
+ val m = o.toMultimap(keySelector, valueSelector)
927
+ println(m.toBlocking.single)
928
+ }
929
+
930
+ @ Test def toMultimapExample3 (): Unit = {
931
+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
932
+ val keySelector = (s : String ) => s.head
933
+ val valueSelector = (s : String ) => s.tail
934
+ val mapFactory = () => mutable.Map ('d' -> mutable.Buffer (" oug" ))
935
+ val m = o.toMultimap(keySelector, valueSelector, mapFactory)
936
+ println(m.toBlocking.single.mapValues(_.toList))
937
+ }
938
+
939
+ @ Test def toMultimapExample4 (): Unit = {
940
+ val o : Observable [String ] = List (" alice" , " bob" , " carol" , " allen" , " clarke" ).toObservable
941
+ val keySelector = (s : String ) => s.head
942
+ val valueSelector = (s : String ) => s.tail
943
+ val mapFactory = () => mutable.Map ('d' -> mutable.ListBuffer (" oug" ))
944
+ val bufferFactory = (k : Char ) => mutable.ListBuffer [String ]()
945
+ val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory)
946
+ println(m.toBlocking.single)
947
+ }
948
+
855
949
@ Test def containsExample (): Unit = {
856
950
val o1 = List (1 , 2 , 3 ).toObservable.contains(2 )
857
951
assertTrue(o1.toBlockingObservable.single)
@@ -880,6 +974,28 @@ class RxScalaDemo extends JUnitSuite {
880
974
assertEquals(List (" alice" , " bob" , " carol" ), o.retry(3 ).toBlockingObservable.toList)
881
975
}
882
976
977
+ @ Test def retryExample3 (): Unit = {
978
+ var isFirst = true
979
+ val o = Observable {
980
+ (subscriber : Subscriber [String ]) =>
981
+ if (isFirst) {
982
+ subscriber.onNext(" alice" )
983
+ subscriber.onError(new IOException (" Oops" ))
984
+ isFirst = false
985
+ }
986
+ else {
987
+ subscriber.onNext(" bob" )
988
+ subscriber.onError(new RuntimeException (" Oops" ))
989
+ }
990
+ }
991
+ o.retry {
992
+ (times, e) => e match {
993
+ case e : IOException => times <= 3
994
+ case _ => false
995
+ }
996
+ }.subscribe(s => println(s), e => e.printStackTrace())
997
+ }
998
+
883
999
@ Test def liftExample1 (): Unit = {
884
1000
// Add "No. " in front of each item
885
1001
val o = List (1 , 2 , 3 ).toObservable.lift {
@@ -1199,4 +1315,82 @@ class RxScalaDemo extends JUnitSuite {
1199
1315
.take(20 )
1200
1316
.toBlocking.foreach(println)
1201
1317
}
1318
+
1319
+ @ Test def onErrorResumeNextExample () {
1320
+ val o = Observable {
1321
+ (subscriber : Subscriber [Int ]) =>
1322
+ subscriber.onNext(1 )
1323
+ subscriber.onNext(2 )
1324
+ subscriber.onError(new IOException (" Oops" ))
1325
+ subscriber.onNext(3 )
1326
+ subscriber.onNext(4 )
1327
+ }
1328
+ o.onErrorResumeNext(_ => Observable .items(10 , 11 , 12 )).subscribe(println(_))
1329
+ }
1330
+
1331
+ @ Test def onErrorFlatMapExample () {
1332
+ val o = Observable {
1333
+ (subscriber : Subscriber [Int ]) =>
1334
+ subscriber.onNext(1 )
1335
+ subscriber.onNext(2 )
1336
+ subscriber.onError(new IOException (" Oops" ))
1337
+ subscriber.onNext(3 )
1338
+ subscriber.onNext(4 )
1339
+ }
1340
+ o.onErrorFlatMap((_, _) => Observable .items(10 , 11 , 12 )).subscribe(println(_))
1341
+ }
1342
+
1343
+ @ Test def onErrorFlatMapExample2 () {
1344
+ val o = Observable .items(4 , 2 , 0 ).map(16 / _).onErrorFlatMap {
1345
+ (e, op) => op match {
1346
+ case Some (v) if v == 0 => Observable .items(Int .MinValue )
1347
+ case _ => Observable .empty
1348
+ }
1349
+ }
1350
+ o.subscribe(println(_))
1351
+ }
1352
+
1353
+ @ Test def switchMapExample () {
1354
+ val o = Observable .interval(300 millis).take(5 ).switchMap[String ] {
1355
+ n => Observable .interval(50 millis).take(10 ).map(i => s " Seq ${n}: ${i}" )
1356
+ }
1357
+ o.toBlocking.foreach(println)
1358
+ }
1359
+
1360
+ @ Test def joinExample () {
1361
+ val o1 = Observable .interval(500 millis).map(n => " 1: " + n)
1362
+ val o2 = Observable .interval(100 millis).map(n => " 2: " + n)
1363
+ val o = o1.join(o2,
1364
+ (_ : String ) => Observable .timer(300 millis),
1365
+ (_ : String ) => Observable .timer(200 millis),
1366
+ (t1 : String , t2 : String ) => (t1, t2))
1367
+ o.take(10 ).toBlocking.foreach(println)
1368
+ }
1369
+
1370
+ @ Test def groupJoinExample () {
1371
+ val o1 = Observable .interval(500 millis).map(n => " 1: " + n)
1372
+ val o2 = Observable .interval(100 millis).map(n => " 2: " + n)
1373
+ val o = o1.groupJoin(o2,
1374
+ (_ : String ) => Observable .timer(300 millis),
1375
+ (_ : String ) => Observable .timer(200 millis),
1376
+ (t1 : String , t2 : Observable [String ]) => (t1, t2.toSeq.toBlocking.single))
1377
+ o.take(3 ).toBlocking.foreach(println)
1378
+ }
1379
+
1380
+ @ Test def pivotExample () {
1381
+ val o1 = (1 to 20 ).toObservable.groupBy(i => if (i <= 10 ) " x" else " y" ).map {
1382
+ case (t : String , o : Observable [Int ]) => (t, o.groupBy(i => i % 2 == 0 ))
1383
+ }
1384
+ println(" o1:" )
1385
+ (for ((k1, o) <- o1;
1386
+ (k2, vs) <- o;
1387
+ v <- vs
1388
+ ) yield (k1, k2, v)).subscribe(println(_))
1389
+ val o2 = o1.pivot
1390
+ println(" o2:" )
1391
+ (for ((k1, o) <- o2;
1392
+ (k2, vs) <- o;
1393
+ v <- vs
1394
+ ) yield (k1, k2, v)).subscribe(println(_))
1395
+ }
1202
1396
}
0 commit comments