Skip to content

Commit 3bac0f8

Browse files
Merge pull request #1375 from zsxwing/rxscala-toXXX
RxScala: Add idiomatic toXXX methods
2 parents 2944606 + 2f8da39 commit 3bac0f8

File tree

2 files changed

+222
-1
lines changed

2 files changed

+222
-1
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package rx.lang.scala
1818

1919
import rx.functions.FuncN
20-
import rx.Observable.OnSubscribeFunc
2120
import rx.lang.scala.observables.ConnectableObservable
2221
import scala.concurrent.duration
2322
import java.util
2423
import collection.JavaConversions._
24+
import scala.collection.generic.CanBuildFrom
25+
import scala.annotation.unchecked.uncheckedVariance
26+
import scala.collection.{Iterable, Traversable, immutable}
27+
import scala.collection.mutable.ArrayBuffer
28+
import scala.reflect.ClassTag
2529

2630

2731
/**
@@ -3970,6 +3974,163 @@ trait Observable[+T]
39703974
}
39713975
}
39723976
}
3977+
3978+
/**
3979+
* Returns an Observable that emits a single item, a collection composed of all the items emitted by
3980+
* the source Observable.
3981+
*
3982+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
3983+
* of items, as you do not have the option to unsubscribe.
3984+
*
3985+
* @tparam Col the collection type to build.
3986+
* @return an Observable that emits a single item, a collection containing all of the items emitted by
3987+
* the source Observable.
3988+
*/
3989+
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, T, Col[T @uncheckedVariance]]): Observable[Col[T @uncheckedVariance]] = {
3990+
lift {
3991+
(subscriber: Subscriber[Col[T]]) => {
3992+
val b = cbf()
3993+
Subscriber[T](
3994+
subscriber,
3995+
(t: T) => {
3996+
b += t: Unit
3997+
},
3998+
e => subscriber.onError(e),
3999+
() => {
4000+
subscriber.onNext(b.result)
4001+
subscriber.onCompleted()
4002+
}
4003+
)
4004+
}
4005+
}
4006+
}
4007+
4008+
/**
4009+
* Returns an Observable that emits a single item, a `Traversable` composed of all the items emitted by
4010+
* the source Observable.
4011+
*
4012+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4013+
* of items, as you do not have the option to unsubscribe.
4014+
*
4015+
* @return an Observable that emits a single item, a `Traversable` containing all of the items emitted by
4016+
* the source Observable.
4017+
*/
4018+
def toTraversable: Observable[Traversable[T]] = to[Traversable]
4019+
4020+
/**
4021+
* Returns an Observable that emits a single item, a `List` composed of all the items emitted by
4022+
* the source Observable.
4023+
*
4024+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4025+
* of items, as you do not have the option to unsubscribe.
4026+
*
4027+
* @return an Observable that emits a single item, a `List` containing all of the items emitted by
4028+
* the source Observable.
4029+
*/
4030+
def toList: Observable[List[T]] = to[List]
4031+
4032+
/**
4033+
* Returns an Observable that emits a single item, an `Iterable` composed of all the items emitted by
4034+
* the source Observable.
4035+
*
4036+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4037+
* of items, as you do not have the option to unsubscribe.
4038+
*
4039+
* @return an Observable that emits a single item, an `Iterable` containing all of the items emitted by
4040+
* the source Observable.
4041+
*/
4042+
def toIterable: Observable[Iterable[T]] = to[Iterable]
4043+
4044+
/**
4045+
* Returns an Observable that emits a single item, an `Iterator` composed of all the items emitted by
4046+
* the source Observable.
4047+
*
4048+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4049+
* of items, as you do not have the option to unsubscribe.
4050+
*
4051+
* @return an Observable that emits a single item, an `Iterator` containing all of the items emitted by
4052+
* the source Observable.
4053+
*/
4054+
def toIterator: Observable[Iterator[T]] = toIterable.map(_.iterator)
4055+
4056+
/**
4057+
* Returns an Observable that emits a single item, a `Stream` composed of all the items emitted by
4058+
* the source Observable.
4059+
*
4060+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4061+
* of items, as you do not have the option to unsubscribe.
4062+
*
4063+
* @return an Observable that emits a single item, a `Stream` containing all of the items emitted by
4064+
* the source Observable.
4065+
*/
4066+
def toStream: Observable[Stream[T]] = to[Stream]
4067+
4068+
/**
4069+
* Returns an Observable that emits a single item, an `IndexedSeq` composed of all the items emitted by
4070+
* the source Observable.
4071+
*
4072+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4073+
* of items, as you do not have the option to unsubscribe.
4074+
*
4075+
* @return an Observable that emits a single item, an `IndexedSeq` containing all of the items emitted by
4076+
* the source Observable.
4077+
*/
4078+
def toIndexedSeq: Observable[immutable.IndexedSeq[T]] = to[immutable.IndexedSeq]
4079+
4080+
/**
4081+
* Returns an Observable that emits a single item, a `Vector` composed of all the items emitted by
4082+
* the source Observable.
4083+
*
4084+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4085+
* of items, as you do not have the option to unsubscribe.
4086+
*
4087+
* @return an Observable that emits a single item, a `Vector` containing all of the items emitted by
4088+
* the source Observable.
4089+
*/
4090+
def toVector: Observable[Vector[T]] = to[Vector]
4091+
4092+
/**
4093+
* Returns an Observable that emits a single item, a `Buffer` composed of all the items emitted by
4094+
* the source Observable.
4095+
*
4096+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4097+
* of items, as you do not have the option to unsubscribe.
4098+
*
4099+
* @return an Observable that emits a single item, a `Buffer` containing all of the items emitted by
4100+
* the source Observable.
4101+
*/
4102+
def toBuffer[U >: T]: Observable[mutable.Buffer[U]] = { // use U >: T because Buffer is invariant
4103+
val us: Observable[U] = this
4104+
us.to[ArrayBuffer]
4105+
}
4106+
4107+
/**
4108+
* Returns an Observable that emits a single item, a `Set` composed of all the items emitted by
4109+
* the source Observable.
4110+
*
4111+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4112+
* of items, as you do not have the option to unsubscribe.
4113+
*
4114+
* @return an Observable that emits a single item, a `Set` containing all of the items emitted by
4115+
* the source Observable.
4116+
*/
4117+
def toSet[U >: T]: Observable[immutable.Set[U]] = { // use U >: T because Set is invariant
4118+
val us: Observable[U] = this
4119+
us.to[immutable.Set]
4120+
}
4121+
4122+
/**
4123+
* Returns an Observable that emits a single item, an `Array` composed of all the items emitted by
4124+
* the source Observable.
4125+
*
4126+
* Be careful not to use this operator on Observables that emit infinite or very large numbers
4127+
* of items, as you do not have the option to unsubscribe.
4128+
*
4129+
* @return an Observable that emits a single item, an `Array` containing all of the items emitted by
4130+
* the source Observable.
4131+
*/
4132+
def toArray[U >: T : ClassTag]: Observable[Array[U]] = // use U >: T because Array is invariant
4133+
toBuffer[U].map(_.toArray)
39734134
}
39744135

39754136
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,4 +286,64 @@ class ObservableTests extends JUnitSuite {
286286
assertEquals(List("a", "b", "c"), o.toBlocking.toList)
287287
assertTrue(called)
288288
}
289+
290+
@Test
291+
def testToTraversable() {
292+
val o = Observable.items(1, 2, 3).toTraversable
293+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
294+
}
295+
296+
@Test
297+
def testToList() {
298+
val o = Observable.items(1, 2, 3).toList
299+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
300+
}
301+
302+
@Test
303+
def testToIterable() {
304+
val o = Observable.items(1, 2, 3).toIterable
305+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
306+
}
307+
308+
@Test
309+
def testToIterator() {
310+
val o = Observable.items(1, 2, 3).toIterator
311+
assertEquals(Seq(1, 2, 3), o.toBlocking.single.toSeq)
312+
}
313+
314+
@Test
315+
def testToStream() {
316+
val o = Observable.items(1, 2, 3).toStream
317+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
318+
}
319+
320+
@Test
321+
def testToIndexedSeq() {
322+
val o = Observable.items(1, 2, 3).toIndexedSeq
323+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
324+
}
325+
326+
@Test
327+
def testToBuffer() {
328+
val o = Observable.items(1, 2, 3).toBuffer
329+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
330+
}
331+
332+
@Test
333+
def testToSet() {
334+
val o = Observable.items(1, 2, 2).toSet
335+
assertEquals(Set(1, 2), o.toBlocking.single)
336+
}
337+
338+
@Test
339+
def testToVector() {
340+
val o = Observable.items(1, 2, 3).toVector
341+
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
342+
}
343+
344+
@Test
345+
def testToArray() {
346+
val o = Observable.items(1, 2, 3).toArray
347+
assertArrayEquals(Array(1, 2, 3), o.toBlocking.single)
348+
}
289349
}

0 commit comments

Comments
 (0)