Skip to content

Commit 8aae497

Browse files
committed
Add foreach to RxScala
1 parent 3141278 commit 8aae497

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3692,6 +3692,48 @@ trait Observable[+T]
36923692
def nest: Observable[Observable[T]] = {
36933693
toScalaObservable(asJavaObservable.nest).map(toScalaObservable[T](_))
36943694
}
3695+
3696+
/**
3697+
* Subscribes to the [[Observable]] and receives notifications for each element.
3698+
*
3699+
* Alias to `subscribe(T => Unit)`.
3700+
*
3701+
* @param onNext function to execute for each item.
3702+
* @throws IllegalArgumentException if `onNext` is null
3703+
* @since 0.19
3704+
*/
3705+
def foreach(onNext: T => Unit): Unit = {
3706+
asJavaObservable.subscribe(onNext)
3707+
}
3708+
3709+
/**
3710+
* Subscribes to the [[Observable]] and receives notifications for each element and error events.
3711+
*
3712+
* Alias to `subscribe(T => Unit, Throwable => Unit)`.
3713+
*
3714+
* @param onNext function to execute for each item.
3715+
* @param onError function to execute when an error is emitted.
3716+
* @throws IllegalArgumentException if `onNext` is null, or if `onError` is null
3717+
* @since 0.19
3718+
*/
3719+
def foreach(onNext: T => Unit, onError: Throwable => Unit): Unit = {
3720+
asJavaObservable.subscribe(onNext, onError)
3721+
}
3722+
3723+
/**
3724+
* Subscribes to the [[Observable]] and receives notifications for each element and the terminal events.
3725+
*
3726+
* Alias to `subscribe(T => Unit, Throwable => Unit, () => Unit)`.
3727+
*
3728+
* @param onNext function to execute for each item.
3729+
* @param onError function to execute when an error is emitted.
3730+
* @param onComplete function to execute when completion is signalled.
3731+
* @throws IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null
3732+
* @since 0.19
3733+
*/
3734+
def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = {
3735+
asJavaObservable.subscribe(onNext, onError, onComplete)
3736+
}
36953737
}
36963738

36973739
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class CompletenessTest extends JUnitSuite {
8686
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
8787
"firstOrDefault(T)" -> "firstOrElse(=> U)",
8888
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
89+
"forEach(Action1[_ >: T])" -> "foreach(T => Unit)",
90+
"forEach(Action1[_ >: T], Action1[Throwable])" -> "foreach(T => Unit, Throwable => Unit)",
91+
"forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)",
8992
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
9093
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
9194
"ignoreElements()" -> "[use `filter(_ => false)`]",

0 commit comments

Comments
 (0)