Skip to content

Adding utility functions for observables of strings useful for processing non blocking IO. #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cf0a9aa
Adding utility functions for observables of strings useful for proces…
abersnaze Oct 29, 2013
e5b7b23
Merge branch 'master' into string-observable
abersnaze Nov 5, 2013
aec34ab
Fixing a race condition in a test
abersnaze Nov 5, 2013
1669bde
Implement the scheduler overloads for Range, From, StartWith
zsxwing Nov 19, 2013
d1f0258
Merge pull request #492 from zsxwing/scheduler-overloads
benjchristensen Nov 19, 2013
b8b8334
Unit test to assert correct scheduler thread
benjchristensen Nov 19, 2013
310d530
Update Observable.java
DavidMGross Nov 19, 2013
9ff3624
Refactored ObserveOn without ScheduledObserver
benjchristensen Nov 19, 2013
f245fcd
Merge pull request #499 from benjchristensen/observeOn-refactor
benjchristensen Nov 19, 2013
22885fa
ParallelMerge Operator
benjchristensen Nov 19, 2013
a6a2440
Merge pull request #501 from benjchristensen/parallelMerge
benjchristensen Nov 19, 2013
bc6965c
Fix ObserveOn and add ParallelMerge Scheduler overload
benjchristensen Nov 20, 2013
30b6b08
Merge pull request #502 from benjchristensen/observeOn-parallelMerge
benjchristensen Nov 20, 2013
62564d6
Scala Bindings Refactor
AppliedDuality Nov 20, 2013
5c467b3
Reorg Scala Structure
benjchristensen Nov 20, 2013
7fd5183
Updated README
AppliedDuality Nov 20, 2013
fc3e08b
Merge pull request #503 from Applied-Duality/scala-bindings
benjchristensen Nov 20, 2013
1e7f701
0.15.0-SNAPSHOT
benjchristensen Nov 20, 2013
895dad4
[Gradle Release Plugin] - pre tag commit: '0.15.0'.
Nov 20, 2013
589d4fa
[Gradle Release Plugin] - new version commit: '0.15.1-SNAPSHOT'.
Nov 20, 2013
f282048
Version 0.15.0
benjchristensen Nov 20, 2013
c2d8da2
Update Observable.java
DavidMGross Nov 21, 2013
1e7eabd
Operators: And, Then, When
akarnokd Nov 21, 2013
e25e159
Update Observable.java
DavidMGross Nov 21, 2013
9343e29
Empty subscribe
benjchristensen Nov 21, 2013
78a7a6e
Merge pull request #508 from benjchristensen/empty-subscribe
benjchristensen Nov 21, 2013
2b5ff00
Merge pull request #510 from benjchristensen/pull-495-zip
benjchristensen Nov 22, 2013
d511fa3
Merge pull request #506 from akarnokd/AndPattern2
benjchristensen Nov 22, 2013
caac9f4
Adding utility functions for observables of strings useful for proces…
abersnaze Oct 29, 2013
7b16265
Fixing a race condition in a test
abersnaze Nov 5, 2013
5be8eaa
Merge branch 'string-observable' of github.com:abersnaze/RxJava into …
abersnaze Nov 22, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# RxJava Releases #

### Version 0.15.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.15.0%22)) ###

This release contains a refactor of the Scala Bindings by @headinthebox that results in some breaking changes.
The previous solution ended up not working well in all cases for idiomatic Scala usage. Java/Scala interop has been changed and is no longer transparent so as to optimize for native Scala usage.
Read the [rxjava-scala README](https://github.com/Netflix/RxJava/tree/master/language-adaptors/rxjava-scala) for more information.

* [Pull 503](https://github.com/Netflix/RxJava/pull/503) New Scala Bindings
* [Pull 502](https://github.com/Netflix/RxJava/pull/502) Fix ObserveOn and add ParallelMerge Scheduler overload
* [Pull 499](https://github.com/Netflix/RxJava/pull/499) ObserveOn Refactor
* [Pull 492](https://github.com/Netflix/RxJava/pull/492) Implement the scheduler overloads for Range, From, StartWith
* [Pull 496](https://github.com/Netflix/RxJava/pull/496) Add contravariant for min and max

### Version 0.14.11 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.14.11%22)) ###

* [Pull 486](https://github.com/Netflix/RxJava/pull/486) BugFix: AsyncSubject
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.14.12-SNAPSHOT
version=0.15.1-SNAPSHOT
213 changes: 128 additions & 85 deletions language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
@@ -1,101 +1,144 @@
# Scala Adaptor for RxJava

This adaptor allows to use RxJava in Scala with anonymous functions, e.g.
Alternative Rx bindings for Scala
=================================

The current RxScala binding attempt to optimize for seamless interop between Scala and Java.
The intended interop is illustrated by the following example where in Scala a class is defined that takes
an `Observable[Movie]` that is transformed using RxScala operators:
```scala
val o = Observable.interval(200 millis).take(5)
o.subscribe(n => println("n = " + n))
Observable(1, 2, 3, 4).reduce(_ + _)
class MovieLib(val moviesStream: Observable[Movie]) {
val threshold = 1200
def shortMovies: Observable[Movie] = ???
def longMovies: Observable[Movie] = ???
}
```
which is then called in Java, passing a Java `Observable<Movie>` to the constructor
```java
public void test() {
MovieLib lib = new MovieLib(Observable.from(...));

For-comprehensions are also supported:

```scala
val first = Observable(10, 11, 12)
val second = Observable(10, 11, 12)
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
lib.longMovies().subscribe(moviePrinter);
}
```

Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples:

The technique used to obtain this transparency is to use a value class with a private constructor that implements
the Rx operators in an idiomatic Scala way, and a companion object that is used to construct instances in Scala
```scala
// instead of concat:
def ++[U >: T](that: Observable[U]): Observable[U]

// instance method instead of static:
def zip[U](that: Observable[U]): Observable[(T, U)]

// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications:
def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U]

// additional type parameter U with lower bound to get covariance right:
def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U]
object Observable {
def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = { new Observable[T](asJava) }
}

// curried in Scala collections, so curry fold also here:
def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R]

// using Duration instead of (long timepan, TimeUnit duration):
def sample(duration: Duration): Observable[T]

// called skip in Java, but drop in Scala
def drop(n: Int): Observable[T]

// there's only mapWithIndex in Java, because Java doesn't have tuples:
def zipWithIndex: Observable[(T, Int)]

// corresponds to Java's toList:
def toSeq: Observable[Seq[T]]

// the implicit evidence argument ensures that switch can only be called on Observables of Observables:
def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U]

// Java's from becomes apply, and we use Scala Range
def apply(range: Range): Observable[Int]

// use Bottom type:
def never: Observable[Nothing]
class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) extends AnyVal {
// Idiomatic Scala friendly definitions of Rx operators
}
```

Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome).

For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala).

Scala code using Rx should only import members from `rx.lang.scala` and below.


## Documentation

The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable).

You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory.

Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it.


## Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22).

Example for Maven:

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-scala</artifactId>
<version>x.y.z</version>
</dependency>
Since `rx.lang.scala.Observable[T] extends AnyVal`, the underlying representation of `rx.lang.scala.Observable[T]`
is the same as `rx.Observable<T>`. Because `rx.lang.scala.Observable[T]` is an opaque type in Scala,
the Scala programmer only sees the Scala-friendly operators.

However, in the current the illusion of interop is quickly lost when going beyond this simple example.
For example but type `Notification[T]` and `Scheduler[T]` are defined using wrappers,
and hence they are not compatible with `Notification<T>` respectively `Scheduler<T>`.
For instance, when materializing an `Observable[T]` in Scala to an `Observable[Notification[T]]`,
we lost the seamless interop with `Observable<Notification<T>>` on the Java side.

However, the real problems with seamless interop show up when we try to creating bindings for other Rx types.
In particular types that have inheritance or more structure.

For example, RxScala currently defines a type synonym `type Observer[-T] = rx.Observer[_ >: T]`,
but no further bindings for observers.
Similarly, for subjects RxScala defines `type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R]`.
The problem with these definitions is that on the Java side, subjects are defined as:
```scala
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> { …}
```
without binding any of the Rx subjects.

The consequence is that `Subject[S,T]` in Scala is unrelated to `rx.lang.scala.Observable[T]` in Scala,
but shows up as a `rx.Observable[T]`. The problem however is that if we want to expose subjects in Scala
such that they derive from both `Observable[S]` and `Observer[T]` we cannot use the `extend AnyVal` trick
we used for `Observable[T]` and immediately lose transparent interop with Java.

The problem is even worse because `AsyncSubject<T>`, `BehaviorSubject<T>`, … all derive from `Subject<T,T>`,
so if we want them to derive from a common base `Subject[T,T]` type in Scala we lose transparency for those as well.
And again, if we expose the various subjects by extending `AnyVal`, they are useless in Scala because they do not inherit
from a common base type. To avoid implementing all methods of observable and observer on each specific subject
we might add implicit conversions to `Observable[T]` and `Observer[T]` but that still does not give Scala users
a native `Subject[S,T]` type.
```scala
object AsyncSubject {
def apply[T](): AsyncSubject[T] =
new AsyncSubject[T](rx.subjects.AsyncSubject.create())
}

and for Ivy:
class AsyncSubject[T] private [scala] (val inner: rx.subjects.AsyncSubject[T])
extends AnyVal
{ … }

```xml
<dependency org="com.netflix.rxjava" name="rxjava-scala" rev="x.y.z" />
implicit final def asObservable[T](subject: AsyncSubject[T]): Observable[T] =
Observable(subject.inner)

implicit final def asObserver[T](subject: AsyncSubject[T]): Observer[T] =
subject.inner
```
The inheritance problem is not just limited to subjects, but also surfaces for subscriptions.
Rx scala currently defines `type Subscription = rx.Subscription` using a type synonym as well,
and we run into exactly the same problems as with subjects when we try to bind the
various Rx subscriptions `BooleanSubscription`, `SerialSubscription`, etc.

and for sbt:
Since we cannot wrap Rx types in Scala such that they are both (a) transparently interoperable with Java,
and (b) feel native and idiomatic to Scala, we should decide in favor of optimizing RxScala for Scala
and consumption of Rx values from Java but not for Scala as a producer.

If we take that approach, we can make bindings that feels like a completely native Scala library,
without needing any complications of the Scala side.
```scala
libraryDependencies ++= Seq(
"com.netflix.rxjava" % "rxjava-scala" % "x.y.z"
)
object Observer { …}
trait Observable[+T] {
def asJavaObservable: rx.Observable[_ <: T]
}

object Observer {…}
trait Observer[-T] {
def asJavaObserver: rx.Observer[_ >: T]
}

object Subject {…}
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
}

object Scheduler {…}
trait Scheduler {
def asJavaScheduler: rx.Scheduler;
}

object Notification {…}
trait Notification[+T] {
def asJavaNotification: rx.Notification[_ <: T]
}

object Subscription {…}
trait Subscription {
def asJavaSubscription: rx.Subscription
}
```
You pay the price when crossing the Scala/Java interop boundary, which is where it should be.
The proper way is to put the burden of interop on the Scala side, in case you want to create
a reusable Rx-based library in Scala, or wrap and unwrap on the Java side.
```java
public static void main(String[] args) {

Observable<Movie> movies = Observable.from(new Movie(3000), new Movie(1000), new Movie(2000));
MovieLib lib = new MovieLib(toScalaObservable(movies));
lib.longMovies().asJavaObservable().subscribe(m ->
System.out.println("A movie of length " + m.lengthInSeconds() + "s")
);
}
```
Delegation versus Inheritance
-----------------------------
The obvious thought is that using delegation instead of inheritance (http://c2.com/cgi/wiki?DelegationIsInheritance)
will lead to excessive wrapping, since all Scala types wrap and delegate to an underlying RxJava implementation.
Note however, that the wrapping happens at query generation time and incurs no overhead when messages are flowing
through the pipeline. Say we have a query `xs.map(f).filter(p).subscribe(o)`. Even though the Scala types are wrappers,
the callback that is registered with xs is something like `x => { val y = f(x); if(p(y)){ o.asJavaObserver.onNext(y) }}`
and hence there is no additional runtime penalty.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,25 +15,31 @@
*/
package rx.lang.scala.examples;

import org.junit.Test;

import rx.Observable;
import rx.lang.scala.examples.Movie;
import rx.lang.scala.examples.MovieLib;
import rx.util.functions.Action1;

import static rx.lang.scala.ImplicitFunctionConversions.toScalaObservable;

public class MovieLibUsage {

Action1<Movie> moviePrinter = new Action1<Movie>() {
public void call(Movie m) {
System.out.println("A movie of length " + m.lengthInSeconds() + "s");
}
};

@Test
public void test() {
MovieLib lib = new MovieLib(Observable.from(new Movie(3000), new Movie(1000), new Movie(2000)));

lib.longMovies().subscribe(moviePrinter);
}

public static void main(String[] args) {

Observable<Movie> movies = Observable.from(
new Movie(3000),
new Movie(1000),
new Movie(2000)
);

MovieLib lib = new MovieLib(toScalaObservable(movies));

lib.longMovies().asJavaObservable().subscribe(new Action1<Movie>() {

@Override
public void call(Movie m) {
System.out.println("A movie of length " + m.lengthInSeconds() + "s");
}
});
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,11 +20,11 @@ import rx.lang.scala.Observable
class Movie(val lengthInSeconds: Int) { }

class MovieLib(val moviesStream: Observable[Movie]) {

val threshold = 1200

def shortMovies: Observable[Movie] = moviesStream.filter(_.lengthInSeconds <= threshold)

def longMovies: Observable[Movie] = moviesStream.filter(_.lengthInSeconds > threshold)

}
Loading