Skip to content

Commit fb1a806

Browse files
Merge pull request #1324 from benjchristensen/trampoline-schedule-unsubscribe
TrampolineScheduler & Unsubscribe
2 parents 7de1f9b + 7058a76 commit fb1a806

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ private Subscription enqueue(Action0 action, long execTime) {
8383

8484
if (exec) {
8585
while (!queue.isEmpty()) {
86-
if (innerSubscription.isUnsubscribed()) {
87-
return Subscriptions.empty();
88-
}
8986
queue.poll().action.call();
9087
}
9188

@@ -108,7 +105,6 @@ public void call() {
108105

109106
@Override
110107
public void unsubscribe() {
111-
QUEUE.set(null); // this assumes we are calling unsubscribe from the same thread
112108
innerSubscription.unsubscribe();
113109
}
114110

rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.assertTrue;
1920

21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
2024
import org.junit.Test;
2125

2226
import rx.Observable;
2327
import rx.Scheduler;
28+
import rx.Scheduler.Worker;
29+
import rx.functions.Action0;
2430
import rx.functions.Action1;
2531
import rx.functions.Func1;
2632

@@ -55,4 +61,62 @@ public void call(String t) {
5561
}
5662
});
5763
}
64+
65+
@Test
66+
public void testNestedTrampolineWithUnsubscribe() {
67+
final ArrayList<String> workDone = new ArrayList<String>();
68+
Worker worker = Schedulers.trampoline().createWorker();
69+
worker.schedule(new Action0() {
70+
71+
@Override
72+
public void call() {
73+
doWorkOnNewTrampoline("A", workDone);
74+
}
75+
76+
});
77+
78+
final Worker worker2 = Schedulers.trampoline().createWorker();
79+
worker2.schedule(new Action0() {
80+
81+
@Override
82+
public void call() {
83+
doWorkOnNewTrampoline("B", workDone);
84+
// we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker
85+
worker2.unsubscribe();
86+
}
87+
88+
});
89+
90+
assertEquals(6, workDone.size());
91+
assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone);
92+
}
93+
94+
private static void doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
95+
Worker worker = Schedulers.trampoline().createWorker();
96+
worker.schedule(new Action0() {
97+
98+
@Override
99+
public void call() {
100+
String msg = key + ".1";
101+
workDone.add(msg);
102+
System.out.println(msg);
103+
Worker worker3 = Schedulers.trampoline().createWorker();
104+
worker3.schedule(createPrintAction(key + ".B.1", workDone));
105+
worker3.schedule(createPrintAction(key + ".B.2", workDone));
106+
}
107+
108+
});
109+
}
110+
111+
private static Action0 createPrintAction(final String message, final ArrayList<String> workDone) {
112+
return new Action0() {
113+
114+
@Override
115+
public void call() {
116+
System.out.println(message);
117+
workDone.add(message);
118+
}
119+
120+
};
121+
}
58122
}

0 commit comments

Comments
 (0)