Skip to content

Commit 33e986e

Browse files
committed
RxScala: Add toFuture to BlockingObservable
1 parent 737f780 commit 33e986e

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.lang.scala.observables
1717

1818
import scala.collection.JavaConverters._
19+
import scala.concurrent.{Future, Promise}
1920
import rx.lang.scala.ImplicitFunctionConversions._
2021
import rx.lang.scala.Observable
2122
import rx.observables.{BlockingObservable => JBlockingObservable}
@@ -227,8 +228,6 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
227228
}
228229
}
229230

230-
// TODO toFuture()
231-
232231
/**
233232
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
234233
*/
@@ -258,6 +257,23 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
258257
asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug
259258
}
260259

260+
/**
261+
* Returns a `Future` representing the single value emitted by this `BlockingObservable`.
262+
*
263+
* `toFuture` throws an `IllegalArgumentException` if the `BlockingObservable` emits more than one item. If the
264+
* `BlockingObservable` may emit more than item, use `BlockingObservable.toList.toFuture`.
265+
*
266+
* `toFuture` throws an `NoSuchElementException` if the `BlockingObservable` is empty.
267+
*
268+
* <img width="640" height="395" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.toFuture.png">
269+
*
270+
* @return a `Future` that expects a single item to be emitted by this `BlockingObservable`.
271+
*/
272+
def toFuture: Future[T] = {
273+
val p = Promise[T]()
274+
o.single.subscribe(t => p.success(t), e => p.failure(e))
275+
p.future
276+
}
261277
}
262278

263279
// Cannot yet have inner class because of this error message:

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,22 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.lang.scala.observables
217

18+
import scala.concurrent.Await
19+
import scala.concurrent.duration._
320
import org.junit.Assert._
421
import org.junit.Test
522
import org.scalatest.junit.JUnitSuite
@@ -113,4 +130,23 @@ class BlockingObservableTest extends JUnitSuite {
113130
val o = Observable.items(1, 2)
114131
assertEquals(2, o.toBlocking.lastOrElse(3))
115132
}
133+
134+
@Test
135+
def testToFuture() {
136+
val o = Observable.items(1)
137+
val r = Await.result(o.toBlocking.toFuture, 10 seconds)
138+
assertEquals(1, r)
139+
}
140+
141+
@Test(expected = classOf[NoSuchElementException])
142+
def testToFutureWithEmpty() {
143+
val o = Observable.empty
144+
Await.result(o.toBlocking.toFuture, 10 seconds)
145+
}
146+
147+
@Test(expected = classOf[IllegalArgumentException])
148+
def testToFutureWithMultipleItems() {
149+
val o = Observable.items(1, 2)
150+
Await.result(o.toBlocking.toFuture, 10 seconds)
151+
}
116152
}

0 commit comments

Comments
 (0)