Skip to content

Commit 54a4793

Browse files
committed
Add retry variant to RxScala
1 parent 03148a3 commit 54a4793

File tree

3 files changed

+39
-0
lines changed

3 files changed

+39
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,28 @@ class RxScalaDemo extends JUnitSuite {
875875
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
876876
}
877877

878+
@Test def retryExample3(): Unit = {
879+
var isFirst = true
880+
val o = Observable {
881+
(subscriber: Subscriber[String]) =>
882+
if (isFirst) {
883+
subscriber.onNext("alice")
884+
subscriber.onError(new IOException("Oops"))
885+
isFirst = false
886+
}
887+
else {
888+
subscriber.onNext("bob")
889+
subscriber.onError(new RuntimeException("Oops"))
890+
}
891+
}
892+
o.retry {
893+
(times, e) => e match {
894+
case e: IOException => times <= 3
895+
case _ => false
896+
}
897+
}.subscribe(s => println(s), e => e.printStackTrace())
898+
}
899+
878900
@Test def liftExample1(): Unit = {
879901
// Add "No. " in front of each item
880902
val o = List(1, 2, 3).toObservable.lift {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3086,6 +3086,22 @@ trait Observable[+T]
30863086
toScalaObservable[T](asJavaObservable.retry())
30873087
}
30883088

3089+
/**
3090+
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls `onError`
3091+
* and the predicate returns true for that specific exception and retry count.
3092+
*
3093+
* <img width="640" height="315" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retry.png">
3094+
*
3095+
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry count
3096+
* @return the source Observable modified with retry logic
3097+
*/
3098+
def retry(predicate: (Int, Throwable) => Boolean): Observable[T] = {
3099+
val f = new Func2[java.lang.Integer, Throwable, java.lang.Boolean] {
3100+
def call(times: java.lang.Integer, e: Throwable): java.lang.Boolean = predicate(times, e)
3101+
}
3102+
toScalaObservable[T](asJavaObservable.retry(f))
3103+
}
3104+
30893105
/**
30903106
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
30913107
* <p>

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class CompletenessTest extends JUnitSuite {
123123
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)",
124124
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)",
125125
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)",
126+
"retry(Func2[Integer, Throwable, Boolean])" -> "retry((Int, Throwable) => Boolean)",
126127
"sample(Observable[U])" -> "sample(Observable[Any])",
127128
"scan(Func2[T, T, T])" -> unnecessary,
128129
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",

0 commit comments

Comments
 (0)