Skip to content

Commit dcad052

Browse files
Merge pull request #1336 from zsxwing/rxscala-bo
RxScala: Add the rest missing methods to BlockingObservable
2 parents 01c7406 + 91c8bea commit dcad052

File tree

4 files changed

+301
-31
lines changed

4 files changed

+301
-31
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import java.io.IOException
1919
import java.util.concurrent.CountDownLatch
2020
import java.util.concurrent.TimeUnit
2121

22+
import scala.concurrent.Await
2223
import scala.collection.mutable
2324
import scala.concurrent.duration.Duration
2425
import scala.concurrent.duration.DurationInt
@@ -226,6 +227,28 @@ class RxScalaDemo extends JUnitSuite {
226227
assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900))
227228
}
228229

230+
@Test def nextExample() {
231+
val o = Observable.interval(100 millis).take(20)
232+
for(i <- o.toBlocking.next) {
233+
println(i)
234+
Thread.sleep(200)
235+
}
236+
}
237+
238+
@Test def latestExample() {
239+
val o = Observable.interval(100 millis).take(20)
240+
for(i <- o.toBlocking.latest) {
241+
println(i)
242+
Thread.sleep(200)
243+
}
244+
}
245+
246+
@Test def toFutureExample() {
247+
val o = Observable.interval(500 millis).take(1)
248+
val r = Await.result(o.toBlocking.toFuture, 2 seconds)
249+
println(r)
250+
}
251+
229252
@Test def testTwoSubscriptionsToOneInterval() {
230253
val o = Observable.interval(100 millis).take(8)
231254
o.subscribe(

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2985,13 +2985,14 @@ trait Observable[+T]
29852985

29862986
/**
29872987
* If the source Observable completes after emitting a single item, return an Observable that emits that
2988-
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
2988+
* item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
2989+
* or `NoSuchElementException` respectively.
29892990
*
29902991
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
29912992
*
29922993
* @return an Observable that emits the single item emitted by the source Observable
2993-
* @throws NoSuchElementException
2994-
* if the source emits more than one item or no items
2994+
* @throws IllegalArgumentException if the source emits more than one item
2995+
* @throws NoSuchElementException if the source emits no items
29952996
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Uncyclo: single()</a>
29962997
* @see "MSDN: Observable.singleAsync()"
29972998
*/
@@ -3252,7 +3253,7 @@ trait Observable[+T]
32523253
*/
32533254
@deprecated("Use `toBlocking` instead", "0.19")
32543255
def toBlockingObservable: BlockingObservable[T] = {
3255-
new BlockingObservable[T](asJavaObservable.toBlocking)
3256+
new BlockingObservable[T](this)
32563257
}
32573258

32583259
/**
@@ -3264,7 +3265,7 @@ trait Observable[+T]
32643265
* @since 0.19
32653266
*/
32663267
def toBlocking: BlockingObservable[T] = {
3267-
new BlockingObservable[T](asJavaObservable.toBlocking)
3268+
new BlockingObservable[T](this)
32683269
}
32693270

32703271
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 120 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,23 @@
1616
package rx.lang.scala.observables
1717

1818
import scala.collection.JavaConverters._
19+
import scala.concurrent.{Future, Promise}
1920
import rx.lang.scala.ImplicitFunctionConversions._
21+
import rx.lang.scala.Observable
22+
import rx.observables.{BlockingObservable => JBlockingObservable}
2023

2124

2225
/**
2326
* An Observable that provides blocking operators.
2427
*
25-
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]]
28+
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
2629
*/
27-
// constructor is private because users should use Observable.toBlockingObservable
28-
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
29-
extends AnyVal
30+
// constructor is private because users should use Observable.toBlocking
31+
class BlockingObservable[+T] private[scala] (val o: Observable[T])
32+
extends AnyVal
3033
{
31-
34+
// This is def because "field definition is not allowed in value class"
35+
private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
3236
/**
3337
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
3438
* completes.
@@ -69,6 +73,31 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
6973
asJava.last : T
7074
}
7175

76+
/**
77+
* Returns an `Option` with the last item emitted by the source Observable,
78+
* or `None` if the source Observable completes without emitting any items.
79+
*
80+
* @return an `Option` with the last item emitted by the source Observable,
81+
* or `None` if the source Observable is empty
82+
*/
83+
def lastOption: Option[T] = {
84+
o.lastOption.toBlocking.single
85+
}
86+
87+
/**
88+
* Returns the last item emitted by the source Observable, or a default item
89+
* if the source Observable completes without emitting any items.
90+
*
91+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
92+
*
93+
* @param default the default item to emit if the source Observable is empty.
94+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
95+
* @return the last item emitted by the source Observable, or a default item if the source Observable is empty
96+
*/
97+
def lastOrElse[U >: T](default: => U): U = {
98+
lastOption getOrElse default
99+
}
100+
72101
/**
73102
* Returns the first item emitted by a specified [[Observable]], or
74103
* `NoSuchElementException` if source contains no elements.
@@ -96,12 +125,29 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
96125
*/
97126
def head : T = first
98127

99-
// last -> use toIterable.last
100-
// lastOrDefault -> use toIterable.lastOption
101-
// first -> use toIterable.head
102-
// firstOrDefault -> use toIterable.headOption
103-
// single(predicate) -> use filter and single
104-
// singleOrDefault -> use singleOption
128+
/**
129+
* Returns an `Option` with the very first item emitted by the source Observable,
130+
* or `None` if the source Observable is empty.
131+
*
132+
* @return an `Option` with the very first item from the source,
133+
* or `None` if the source Observable completes without emitting any item.
134+
*/
135+
def headOption: Option[T] = {
136+
o.headOption.toBlocking.single
137+
}
138+
139+
/**
140+
* Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty.
141+
*
142+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
143+
*
144+
* @param default The default value to emit if the source Observable doesn't emit anything.
145+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
146+
* @return the very first item from the source, or a default value if the source Observable completes without emitting any item.
147+
*/
148+
def headOrElse[U >: T](default: => U): U = {
149+
headOption getOrElse default
150+
}
105151

106152
/**
107153
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
@@ -130,32 +176,48 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
130176
}
131177

132178
/**
133-
* If this {@link Observable} completes after emitting a single item, return that item,
134-
* otherwise throw an exception.
135-
* <p>
136-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
179+
* If the source Observable completes after emitting a single item, return that item. If the source Observable
180+
* emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively.
181+
*
182+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
137183
*
138-
* @return the single item emitted by the {@link Observable}
184+
* @return an Observable that emits the single item emitted by the source Observable
185+
* @throws IllegalArgumentException if the source emits more than one item
186+
* @throws NoSuchElementException if the source emits no items
139187
*/
140188
def single: T = {
141189
asJava.single(): T // useless ascription because of compiler bug
142190
}
143191

144192
/**
145-
* If this {@link Observable} completes after emitting a single item, return an Option containing
146-
* this item, otherwise return {@code None}.
193+
* If the source Observable completes after emitting a single item, return an `Option` with that item;
194+
* if the source Observable is empty, return `None`. If the source Observable emits more than one item,
195+
* throw an `IllegalArgumentException`.
196+
*
197+
* @return an `Option` with the single item emitted by the source Observable, or
198+
* `None` if the source Observable is empty
199+
* @throws IllegalArgumentException if the source Observable emits more than one item
147200
*/
148201
def singleOption: Option[T] = {
149-
var size: Int = 0
150-
var last: Option[T] = None
151-
for (t <- toIterable) {
152-
size += 1
153-
last = Some(t)
154-
}
155-
if (size == 1) last else None
202+
o.singleOption.toBlocking.single
156203
}
157204

158-
// TODO toFuture()
205+
/**
206+
* If the source Observable completes after emitting a single item, return that item;
207+
* if the source Observable is empty, return a default item. If the source Observable
208+
* emits more than one item, throw an `IllegalArgumentException`.
209+
*
210+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
211+
*
212+
* @param default a default value to emit if the source Observable emits no item.
213+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
214+
* @return the single item emitted by the source Observable, or a default item if
215+
* the source Observable is empty
216+
* @throws IllegalArgumentException if the source Observable emits more than one item
217+
*/
218+
def singleOrElse[U >: T](default: => U): U = {
219+
singleOption getOrElse default
220+
}
159221

160222
/**
161223
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
@@ -171,6 +233,38 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
171233
asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug
172234
}
173235

236+
/**
237+
* Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`,
238+
* waiting if necessary for one to become available.
239+
*
240+
* If this `BlockingObservable` produces items faster than `Iterator.next` takes them,
241+
* `onNext` events might be skipped, but `onError` or `onCompleted` events are not.
242+
*
243+
* Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event.
244+
*
245+
* @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable`
246+
*/
247+
def latest: Iterable[T] = {
248+
asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug
249+
}
250+
251+
/**
252+
* Returns a `Future` representing the single value emitted by this `BlockingObservable`.
253+
*
254+
* The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable`
255+
* emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable`
256+
* is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable`
257+
* and do not want to handle these `Exception`s.
258+
*
259+
* <img width="640" height="395" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.toFuture.png">
260+
*
261+
* @return a `Future` that expects a single item to be emitted by this `BlockingObservable`.
262+
*/
263+
def toFuture: Future[T] = {
264+
val p = Promise[T]()
265+
o.single.subscribe(t => p.success(t), e => p.failure(e))
266+
p.future
267+
}
174268
}
175269

176270
// Cannot yet have inner class because of this error message:

0 commit comments

Comments
 (0)