Skip to content

Commit ec88d58

Browse files
Merge pull request #343 from benjchristensen/covariant-support
Covariant Support with super/extends and OnSubscribeFunc
2 parents d499390 + 3585570 commit ec88d58

File tree

84 files changed

+1521
-1029
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+1521
-1029
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.groovy;
17+
18+
import groovy.lang.Closure;
19+
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
23+
/**
24+
* Concrete wrapper that accepts a {@link Closure} and produces a {@link OnSubscribeFunc}.
25+
*
26+
* @param <T>
27+
*/
28+
public class GroovyOnSubscribeFuncWrapper<T> implements OnSubscribeFunc<T> {
29+
30+
private final Closure<Subscription> closure;
31+
32+
public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
33+
this.closure = closure;
34+
}
35+
36+
@Override
37+
public Subscription onSubscribe(Observer<? super T> observer) {
38+
return closure.call(observer);
39+
}
40+
41+
}

language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;
3434

3535
import rx.Observable;
36+
import rx.Observable.OnSubscribeFunc;
3637
import rx.observables.BlockingObservable;
3738
import rx.util.functions.Action;
3839
import rx.util.functions.Function;
@@ -52,33 +53,6 @@ public RxGroovyExtensionModule() {
5253
super("RxGroovyExtensionModule", "1.0");
5354
}
5455

55-
/**
56-
* Keeping this code around a little while as it was hard to figure out ... and I'm still messing with it while debugging.
57-
*
58-
* Once the rest of this ExtensionModule stuff is working I'll delete this method.
59-
*
60-
* This is used for manually initializing rather than going via the org.codehaus.groovy.runtime.ExtensionModule properties file.
61-
*/
62-
public static void initializeManuallyForTesting() {
63-
System.out.println("initialize");
64-
MetaClassRegistryImpl mcRegistry = ((MetaClassRegistryImpl) GroovySystem.getMetaClassRegistry());
65-
// RxGroovyExtensionModule em = new RxGroovyExtensionModule();
66-
67-
Properties p = new Properties();
68-
p.setProperty("moduleFactory", "rx.lang.groovy.RxGroovyPropertiesModuleFactory");
69-
Map<CachedClass, List<MetaMethod>> metaMethods = new HashMap<CachedClass, List<MetaMethod>>();
70-
mcRegistry.registerExtensionModuleFromProperties(p, RxGroovyExtensionModule.class.getClassLoader(), metaMethods);
71-
72-
for (ExtensionModule m : mcRegistry.getModuleRegistry().getModules()) {
73-
System.out.println("Module: " + m.getName());
74-
}
75-
76-
for (CachedClass cc : metaMethods.keySet()) {
77-
System.out.println("Adding MetaMethods to CachedClass: " + cc);
78-
cc.addNewMopMethods(metaMethods.get(cc));
79-
}
80-
}
81-
8256
@SuppressWarnings("rawtypes")
8357
@Override
8458
public List<MetaMethod> getMetaMethods() {
@@ -135,6 +109,8 @@ public Object invoke(Object object, Object[] arguments) {
135109
if (o instanceof Closure) {
136110
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
137111
newArgs[i] = new GroovyActionWrapper((Closure) o);
112+
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
113+
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
138114
} else {
139115
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
140116
}

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.mockito.MockitoAnnotations;
3131

3232
import rx.Notification;
3333
import rx.Observable;
34+
import rx.Observable.OnSubscribeFunc;
3435
import rx.Observer;
3536
import rx.Subscription;
3637
import rx.observables.GroupedObservable;
@@ -296,9 +297,9 @@ def class ObservableTests {
296297
}
297298

298299

299-
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
300+
def class AsyncObservable implements OnSubscribeFunc {
300301

301-
public Subscription call(final Observer<Integer> observer) {
302+
public Subscription onSubscribe(final Observer<Integer> observer) {
302303
new Thread(new Runnable() {
303304
public void run() {
304305
try {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ object RxImplicits {
1919
import java.{ lang => jlang }
2020
import language.implicitConversions
2121

22-
import rx.Observable
22+
import rx.{ Observable, Observer, Subscription }
23+
import rx.Observable.OnSubscribeFunc
2324
import rx.observables.BlockingObservable
2425
import rx.util.functions._
2526

@@ -56,7 +57,7 @@ object RxImplicits {
5657
}
5758

5859
/**
59-
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
60+
* Converts a function shaped like compareTo into the equivalent Rx Func2
6061
*/
6162
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
6263
new Func2[A, A, jlang.Integer] {
@@ -100,13 +101,18 @@ object RxImplicits {
100101
def call(a: A, b: B, c: C, d: D) = f(a, b, c, d)
101102
}
102103

104+
implicit def onSubscribeFunc[A](f: (Observer[_ >: A]) => Subscription): OnSubscribeFunc[A] =
105+
new OnSubscribeFunc[A] {
106+
override def onSubscribe(a: Observer[_ >: A]) = f(a)
107+
}
108+
103109
/**
104110
* This implicit class implements all of the methods necessary for including Observables in a
105111
* for-comprehension. Note that return type is always Observable, so that the ScalaObservable
106112
* type never escapes the for-comprehension
107113
*/
108114
implicit class ScalaObservable[A](wrapped: Observable[A]) {
109-
def map[B](f: A => B): Observable[B] = wrapped.map(f)
115+
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
110116
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
111117
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
112118
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
@@ -131,7 +137,9 @@ class UnitTestSuite extends JUnitSuite {
131137
import org.mockito.Mockito._
132138
import org.mockito.{ MockitoAnnotations, Mock }
133139
import rx.{ Notification, Observer, Observable, Subscription }
140+
import rx.Observable.OnSubscribeFunc
134141
import rx.observables.GroupedObservable
142+
import rx.subscriptions.Subscriptions
135143
import collection.mutable.ArrayBuffer
136144
import collection.JavaConverters._
137145

@@ -147,7 +155,7 @@ class UnitTestSuite extends JUnitSuite {
147155
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
148156
var t: Thread = null
149157

150-
override def subscribe(observer: Observer[String]): Subscription = {
158+
override def subscribe(observer: Observer[_ >: String]): Subscription = {
151159
println("ObservableWithException subscribed to ...")
152160
t = new Thread(new Runnable() {
153161
override def run() {
@@ -175,7 +183,6 @@ class UnitTestSuite extends JUnitSuite {
175183
}
176184

177185
// tests of static methods
178-
179186
@Test def testSingle {
180187
assertEquals(1, Observable.from(1).toBlockingObservable.single)
181188
}
@@ -208,6 +215,11 @@ class UnitTestSuite extends JUnitSuite {
208215
case ex: Throwable => fail("Caught unexpected exception " + ex.getCause + ", expected IllegalStateException")
209216
}
210217
}
218+
219+
@Test def testCreateFromOnSubscribeFunc {
220+
val created = Observable.create((o: Observer[_ >: Integer]) => Subscriptions.empty)
221+
//no assertions on subscription, just testing the implicit
222+
}
211223

212224
@Test def testFromJavaInterop {
213225
val observable = Observable.from(List(1, 2, 3).asJava)
@@ -248,7 +260,7 @@ class UnitTestSuite extends JUnitSuite {
248260

249261
@Test def testFlattenMerge {
250262
val observable = Observable.from(Observable.from(1, 2, 3))
251-
val merged = Observable.merge(observable)
263+
val merged = Observable.merge[Int](observable)
252264
assertSubscribeReceives(merged)(1, 2, 3)
253265
}
254266

@@ -272,6 +284,18 @@ class UnitTestSuite extends JUnitSuite {
272284
assertSubscribeReceives(synchronized)(1, 2, 3)
273285
}
274286

287+
@Test def testZip2() {
288+
val colors: Observable[String] = Observable.from("red", "green", "blue")
289+
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")
290+
291+
case class Character(color: String, name: String)
292+
293+
val cheetara = Character("green", "cheetara")
294+
val panthro = Character("blue", "panthro")
295+
val characters = Observable.zip[String, String, Character](colors, names, Character.apply _)
296+
assertSubscribeReceives(characters)(cheetara, panthro)
297+
}
298+
275299
@Test def testZip3() {
276300
val numbers = Observable.from(1, 2, 3)
277301
val colors = Observable.from("red", "green", "blue")
@@ -283,7 +307,7 @@ class UnitTestSuite extends JUnitSuite {
283307
val cheetara = Character(2, "green", "cheetara")
284308
val panthro = Character(3, "blue", "panthro")
285309

286-
val characters = Observable.zip(numbers, colors, names, Character.apply _)
310+
val characters = Observable.zip[Int, String, String, Character](numbers, colors, names, Character.apply _)
287311
assertSubscribeReceives(characters)(liono, cheetara, panthro)
288312
}
289313

@@ -299,7 +323,7 @@ class UnitTestSuite extends JUnitSuite {
299323
val cheetara = Character(2, "green", "cheetara", false)
300324
val panthro = Character(3, "blue", "panthro", false)
301325

302-
val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
326+
val characters = Observable.zip[Int, String, String, Boolean, Character](numbers, colors, names, isLeader, Character.apply _)
303327
assertSubscribeReceives(characters)(liono, cheetara, panthro)
304328
}
305329

@@ -338,7 +362,8 @@ class UnitTestSuite extends JUnitSuite {
338362
@Test def testMap {
339363
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
340364
val mappedNumbers = ArrayBuffer.empty[Int]
341-
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
365+
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
366+
mapped.subscribe((squareVal: Int) => {
342367
mappedNumbers.append(squareVal)
343368
})
344369
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
@@ -458,18 +483,9 @@ class UnitTestSuite extends JUnitSuite {
458483
assertSubscribeReceives(skipped)(3, 4)
459484
}
460485

461-
/**
462-
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
463-
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
464-
* it should produce onNext(first), onNext(second), and 1 onCompleted
465-
*
466-
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
467-
*/
468486
@Test def testTake {
469-
import rx.operators._
470-
471487
val observable = Observable.from(1, 2, 3, 4, 5)
472-
val took = Observable.create(OperationTake.take(observable, 2))
488+
val took = observable.take(2)
473489
assertSubscribeReceives(took)(1, 2)
474490
}
475491

@@ -479,11 +495,11 @@ class UnitTestSuite extends JUnitSuite {
479495
assertSubscribeReceives(took)(1, 3, 5)
480496
}
481497

482-
/*@Test def testTakeWhileWithIndex {
483-
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
484-
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
485-
assertSubscribeReceives(took)(9, 11)
486-
}*/
498+
@Test def testTakeWhileWithIndex {
499+
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
500+
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
501+
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
502+
}
487503

488504
@Test def testTakeLast {
489505
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
@@ -559,7 +575,7 @@ class UnitTestSuite extends JUnitSuite {
559575

560576
@Test def testFilterInForComprehension {
561577
val doubler = (i: Int) => Observable.from(i, i)
562-
val filteredObservable = for {
578+
val filteredObservable: Observable[Int] = for {
563579
i: Int <- Observable.from(1, 2, 3, 4)
564580
j: Int <- doubler(i) if isOdd(i)
565581
} yield j

rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package rx.android.concurrency;
22

33
import android.os.Handler;
4+
45
import org.junit.Test;
56
import org.junit.runner.RunWith;
67
import org.mockito.ArgumentCaptor;
78
import org.robolectric.RobolectricTestRunner;
89
import org.robolectric.annotation.Config;
10+
911
import rx.Scheduler;
1012
import rx.Subscription;
1113
import rx.operators.SafeObservableSubscription;
@@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
3941
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
4042
*/
4143
@Override
42-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
44+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4345
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
4446
}
4547

@@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
5658
* @return A Subscription from which one can unsubscribe from.
5759
*/
5860
@Override
59-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
6062
final SafeObservableSubscription subscription = new SafeObservableSubscription();
6163
final Scheduler _scheduler = this;
6264
handler.postDelayed(new Runnable() {
@@ -76,6 +78,7 @@ public static final class UnitTest {
7678
public void shouldScheduleImmediateActionOnHandlerThread() {
7779
final Handler handler = mock(Handler.class);
7880
final Object state = new Object();
81+
@SuppressWarnings("unchecked")
7982
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
8083

8184
Scheduler scheduler = new HandlerThreadScheduler(handler);
@@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
9497
public void shouldScheduleDelayedActionOnHandlerThread() {
9598
final Handler handler = mock(Handler.class);
9699
final Object state = new Object();
100+
@SuppressWarnings("unchecked")
97101
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
98102

99103
Scheduler scheduler = new HandlerThreadScheduler(handler);

rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private SwingScheduler() {
5555
}
5656

5757
@Override
58-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
58+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
5959
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
6060
EventQueue.invokeLater(new Runnable() {
6161
@Override
@@ -75,7 +75,7 @@ public void call() {
7575
}
7676

7777
@Override
78-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
78+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
7979
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
8080
long delay = unit.toMillis(dueTime);
8181
assertThatTheDelayIsValidForTheSwingTimer(delay);
@@ -113,7 +113,7 @@ public void call() {
113113
}
114114

115115
@Override
116-
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
116+
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
117117
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118118

119119
final long delay = unit.toMillis(period);

0 commit comments

Comments
 (0)