Skip to content

Commit 8d75cd4

Browse files
committed
RxScala: Add *Option and *OrElse to BlockingObservable
1 parent 022ae66 commit 8d75cd4

File tree

3 files changed

+218
-29
lines changed

3 files changed

+218
-29
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,13 +2862,14 @@ trait Observable[+T]
28622862

28632863
/**
28642864
* If the source Observable completes after emitting a single item, return an Observable that emits that
2865-
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
2865+
* item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
2866+
* or `NoSuchElementException` respectively.
28662867
*
28672868
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
28682869
*
28692870
* @return an Observable that emits the single item emitted by the source Observable
2870-
* @throws NoSuchElementException
2871-
* if the source emits more than one item or no items
2871+
* @throws IllegalArgumentException if the source emits more than one item
2872+
* @throws NoSuchElementException if the source emits no items
28722873
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Uncyclo: single()</a>
28732874
* @see "MSDN: Observable.singleAsync()"
28742875
*/
@@ -3093,7 +3094,7 @@ trait Observable[+T]
30933094
*/
30943095
@deprecated("Use `toBlocking` instead", "0.19")
30953096
def toBlockingObservable: BlockingObservable[T] = {
3096-
new BlockingObservable[T](asJavaObservable.toBlocking)
3097+
new BlockingObservable[T](this)
30973098
}
30983099

30993100
/**
@@ -3105,7 +3106,7 @@ trait Observable[+T]
31053106
* @since 0.19
31063107
*/
31073108
def toBlocking: BlockingObservable[T] = {
3108-
new BlockingObservable[T](asJavaObservable.toBlocking)
3109+
new BlockingObservable[T](this)
31093110
}
31103111

31113112
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 96 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ package rx.lang.scala.observables
1717

1818
import scala.collection.JavaConverters._
1919
import rx.lang.scala.ImplicitFunctionConversions._
20+
import rx.lang.scala.Observable
21+
import rx.observables.{BlockingObservable => JBlockingObservable}
2022

2123

2224
/**
2325
* An Observable that provides blocking operators.
2426
*
25-
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]]
27+
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
2628
*/
27-
// constructor is private because users should use Observable.toBlockingObservable
28-
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
29-
extends AnyVal
29+
// constructor is private because users should use Observable.toBlocking
30+
class BlockingObservable[+T] private[scala] (val o: Observable[T])
31+
extends AnyVal
3032
{
31-
33+
// This is def because "field definition is not allowed in value class"
34+
private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
3235
/**
3336
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
3437
* completes.
@@ -69,6 +72,34 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
6972
asJava.last : T
7073
}
7174

75+
/**
76+
* Returns an `Option` with the last item emitted by the source Observable,
77+
* or `None` if the source Observable completes without emitting any items.
78+
*
79+
* @return an `Option` with the last item emitted by the source Observable,
80+
* or `None` if the source Observable is empty
81+
*/
82+
def lastOption: Option[T] = {
83+
o.lastOption.toBlocking.single
84+
}
85+
86+
/**
87+
* Returns the last item emitted by the source Observable, or a default item
88+
* if the source Observable completes without emitting any items.
89+
*
90+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
91+
*
92+
* @param default the default item to emit if the source Observable is empty.
93+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
94+
* @return the last item emitted by the source Observable, or a default item if the source Observable is empty
95+
*/
96+
def lastOrElse[U >: T](default: => U): U = {
97+
lastOption match {
98+
case Some(element) => element
99+
case None => default
100+
}
101+
}
102+
72103
/**
73104
* Returns the first item emitted by a specified [[Observable]], or
74105
* `NoSuchElementException` if source contains no elements.
@@ -96,12 +127,32 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
96127
*/
97128
def head : T = first
98129

99-
// last -> use toIterable.last
100-
// lastOrDefault -> use toIterable.lastOption
101-
// first -> use toIterable.head
102-
// firstOrDefault -> use toIterable.headOption
103-
// single(predicate) -> use filter and single
104-
// singleOrDefault -> use singleOption
130+
/**
131+
* Returns an `Option` with the very first item emitted by the source Observable,
132+
* or `None` if the source Observable is empty.
133+
*
134+
* @return an `Option` with the very first item from the source,
135+
* or `None` if the source Observable completes without emitting any item.
136+
*/
137+
def headOption: Option[T] = {
138+
o.headOption.toBlocking.single
139+
}
140+
141+
/**
142+
* Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty.
143+
*
144+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
145+
*
146+
* @param default The default value to emit if the source Observable doesn't emit anything.
147+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
148+
* @return the very first item from the source, or a default value if the source Observable completes without emitting any item.
149+
*/
150+
def headOrElse[U >: T](default: => U): U = {
151+
headOption match {
152+
case Some(element) => element
153+
case None => default
154+
}
155+
}
105156

106157
/**
107158
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
@@ -130,29 +181,50 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
130181
}
131182

132183
/**
133-
* If this {@link Observable} completes after emitting a single item, return that item,
134-
* otherwise throw an exception.
135-
* <p>
136-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
184+
* If the source Observable completes after emitting a single item, return that item. If the source Observable
185+
* emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively.
137186
*
138-
* @return the single item emitted by the {@link Observable}
187+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
188+
*
189+
* @return an Observable that emits the single item emitted by the source Observable
190+
* @throws IllegalArgumentException if the source emits more than one item
191+
* @throws NoSuchElementException if the source emits no items
139192
*/
140193
def single: T = {
141194
asJava.single(): T // useless ascription because of compiler bug
142195
}
143196

144197
/**
145-
* If this {@link Observable} completes after emitting a single item, return an Option containing
146-
* this item, otherwise return {@code None}.
198+
* If the source Observable completes after emitting a single item, return an `Option` with that item;
199+
* if the source Observable is empty, return `None`. If the source Observable emits more than one item,
200+
* throw an `IllegalArgumentException`.
201+
*
202+
* @return an `Option` with the single item emitted by the source Observable, or
203+
* `None` if the source Observable is empty
204+
* @throws IllegalArgumentException if the source Observable emits more than one item
147205
*/
148206
def singleOption: Option[T] = {
149-
var size: Int = 0
150-
var last: Option[T] = None
151-
for (t <- toIterable) {
152-
size += 1
153-
last = Some(t)
207+
o.singleOption.toBlocking.single
208+
}
209+
210+
/**
211+
* If the source Observable completes after emitting a single item, return that item;
212+
* if the source Observable is empty, return a default item. If the source Observable
213+
* emits more than one item, throw an `IllegalArgumentException`.
214+
*
215+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
216+
*
217+
* @param default a default value to emit if the source Observable emits no item.
218+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
219+
* @return the single item emitted by the source Observable, or a default item if
220+
* the source Observable is empty
221+
* @throws IllegalArgumentException if the source Observable emits more than one item
222+
*/
223+
def singleOrElse[U >: T](default: => U): U = {
224+
singleOption match {
225+
case Some(element) => element
226+
case None => default
154227
}
155-
if (size == 1) last else None
156228
}
157229

158230
// TODO toFuture()
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package rx.lang.scala.observables
2+
3+
import org.junit.Assert._
4+
import org.junit.Test
5+
import org.scalatest.junit.JUnitSuite
6+
import scala.language.postfixOps
7+
import rx.lang.scala.Observable
8+
9+
class BlockingObservableTest extends JUnitSuite {
10+
11+
@Test
12+
def testSingleOption() {
13+
val o = Observable.items(1)
14+
assertEquals(Some(1), o.toBlocking.singleOption)
15+
}
16+
17+
@Test
18+
def testSingleOptionWithEmpty() {
19+
val o = Observable.empty
20+
assertEquals(None, o.toBlocking.singleOption)
21+
}
22+
23+
@Test(expected = classOf[IllegalArgumentException])
24+
def testSingleOptionWithMultipleItems() {
25+
Observable.items(1, 2).toBlocking.singleOption
26+
}
27+
28+
@Test
29+
def testSingleOrElse() {
30+
val o = Observable.items(1)
31+
assertEquals(1, o.toBlocking.singleOrElse(2))
32+
}
33+
34+
@Test
35+
def testSingleOrElseWithEmpty() {
36+
val o = Observable.empty
37+
assertEquals(2, o.toBlocking.singleOrElse(2))
38+
}
39+
40+
@Test(expected = classOf[IllegalArgumentException])
41+
def testSingleOrElseWithMultipleItems() {
42+
Observable.items(1, 2).toBlocking.singleOrElse(2)
43+
}
44+
45+
@Test
46+
def testHeadOption() {
47+
val o = Observable.items(1)
48+
assertEquals(Some(1), o.toBlocking.headOption)
49+
}
50+
51+
@Test
52+
def testHeadOptionWithEmpty() {
53+
val o = Observable.empty
54+
assertEquals(None, o.toBlocking.headOption)
55+
}
56+
57+
@Test
58+
def testHeadOptionWithMultipleItems() {
59+
val o = Observable.items(1, 2)
60+
assertEquals(Some(1), o.toBlocking.headOption)
61+
}
62+
63+
@Test
64+
def testHeadOrElse() {
65+
val o = Observable.items(1)
66+
assertEquals(1, o.toBlocking.headOrElse(2))
67+
}
68+
69+
@Test
70+
def testHeadOrElseWithEmpty() {
71+
val o = Observable.empty
72+
assertEquals(2, o.toBlocking.headOrElse(2))
73+
}
74+
75+
@Test
76+
def testHeadOrElseWithMultipleItems() {
77+
val o = Observable.items(1, 2)
78+
assertEquals(1, o.toBlocking.headOrElse(2))
79+
}
80+
81+
@Test
82+
def testLastOption() {
83+
val o = Observable.items(1)
84+
assertEquals(Some(1), o.toBlocking.lastOption)
85+
}
86+
87+
@Test
88+
def testLastOptionWithEmpty() {
89+
val o = Observable.empty
90+
assertEquals(None, o.toBlocking.lastOption)
91+
}
92+
93+
@Test
94+
def testLastOptionWithMultipleItems() {
95+
val o = Observable.items(1, 2)
96+
assertEquals(Some(2), o.toBlocking.lastOption)
97+
}
98+
99+
@Test
100+
def testLastOrElse() {
101+
val o = Observable.items(1)
102+
assertEquals(1, o.toBlocking.lastOrElse(2))
103+
}
104+
105+
@Test
106+
def testLastOrElseWithEmpty() {
107+
val o = Observable.empty
108+
assertEquals(2, o.toBlocking.lastOrElse(2))
109+
}
110+
111+
@Test
112+
def testLastOrElseWithMultipleItems() {
113+
val o = Observable.items(1, 2)
114+
assertEquals(2, o.toBlocking.lastOrElse(3))
115+
}
116+
}

0 commit comments

Comments
 (0)