Skip to content

RxScala: Add idiomatic toXXX methods #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package rx.lang.scala

import rx.functions.FuncN
import rx.Observable.OnSubscribeFunc
import rx.lang.scala.observables.ConnectableObservable
import scala.concurrent.duration
import java.util
import collection.JavaConversions._
import scala.collection.generic.CanBuildFrom
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{Iterable, Traversable, immutable}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag


/**
Expand Down Expand Up @@ -3970,6 +3974,163 @@ trait Observable[+T]
}
}
}

/**
* Returns an Observable that emits a single item, a collection composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @tparam Col the collection type to build.
* @return an Observable that emits a single item, a collection containing all of the items emitted by
* the source Observable.
*/
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, T, Col[T @uncheckedVariance]]): Observable[Col[T @uncheckedVariance]] = {
lift {
(subscriber: Subscriber[Col[T]]) => {
val b = cbf()
Subscriber[T](
subscriber,
(t: T) => {
b += t: Unit
},
e => subscriber.onError(e),
() => {
subscriber.onNext(b.result)
subscriber.onCompleted()
}
)
}
}
}

/**
* Returns an Observable that emits a single item, a `Traversable` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Traversable` containing all of the items emitted by
* the source Observable.
*/
def toTraversable: Observable[Traversable[T]] = to[Traversable]

/**
* Returns an Observable that emits a single item, a `List` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `List` containing all of the items emitted by
* the source Observable.
*/
def toList: Observable[List[T]] = to[List]

/**
* Returns an Observable that emits a single item, an `Iterable` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Iterable` containing all of the items emitted by
* the source Observable.
*/
def toIterable: Observable[Iterable[T]] = to[Iterable]

/**
* Returns an Observable that emits a single item, an `Iterator` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Iterator` containing all of the items emitted by
* the source Observable.
*/
def toIterator: Observable[Iterator[T]] = toIterable.map(_.iterator)

/**
* Returns an Observable that emits a single item, a `Stream` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Stream` containing all of the items emitted by
* the source Observable.
*/
def toStream: Observable[Stream[T]] = to[Stream]

/**
* Returns an Observable that emits a single item, an `IndexedSeq` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `IndexedSeq` containing all of the items emitted by
* the source Observable.
*/
def toIndexedSeq: Observable[immutable.IndexedSeq[T]] = to[immutable.IndexedSeq]

/**
* Returns an Observable that emits a single item, a `Vector` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Vector` containing all of the items emitted by
* the source Observable.
*/
def toVector: Observable[Vector[T]] = to[Vector]

/**
* Returns an Observable that emits a single item, a `Buffer` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Buffer` containing all of the items emitted by
* the source Observable.
*/
def toBuffer[U >: T]: Observable[mutable.Buffer[U]] = { // use U >: T because Buffer is invariant
val us: Observable[U] = this
us.to[ArrayBuffer]
}

/**
* Returns an Observable that emits a single item, a `Set` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Set` containing all of the items emitted by
* the source Observable.
*/
def toSet[U >: T]: Observable[immutable.Set[U]] = { // use U >: T because Set is invariant
val us: Observable[U] = this
us.to[immutable.Set]
}

/**
* Returns an Observable that emits a single item, an `Array` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Array` containing all of the items emitted by
* the source Observable.
*/
def toArray[U >: T : ClassTag]: Observable[Array[U]] = // use U >: T because Array is invariant
toBuffer[U].map(_.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,64 @@ class ObservableTests extends JUnitSuite {
assertEquals(List("a", "b", "c"), o.toBlocking.toList)
assertTrue(called)
}

@Test
def testToTraversable() {
val o = Observable.items(1, 2, 3).toTraversable
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToList() {
val o = Observable.items(1, 2, 3).toList
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIterable() {
val o = Observable.items(1, 2, 3).toIterable
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIterator() {
val o = Observable.items(1, 2, 3).toIterator
assertEquals(Seq(1, 2, 3), o.toBlocking.single.toSeq)
}

@Test
def testToStream() {
val o = Observable.items(1, 2, 3).toStream
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIndexedSeq() {
val o = Observable.items(1, 2, 3).toIndexedSeq
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToBuffer() {
val o = Observable.items(1, 2, 3).toBuffer
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToSet() {
val o = Observable.items(1, 2, 2).toSet
assertEquals(Set(1, 2), o.toBlocking.single)
}

@Test
def testToVector() {
val o = Observable.items(1, 2, 3).toVector
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToArray() {
val o = Observable.items(1, 2, 3).toArray
assertArrayEquals(Array(1, 2, 3), o.toBlocking.single)
}
}