Skip to content

Commit 7058a76

Browse files
TrampolineScheduler & Unsubscribe
Unsubscribing should prevent new additions to a Worker, but not prevent already scheduled work, and definitely not affect other Workers using the same thread (by modifying the ThreadLocal as it was doing). See the unit test for details of how unsubscribing 1 Worker could prevent work from being done on a completely separate Worker.
1 parent 7de1f9b commit 7058a76

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ private Subscription enqueue(Action0 action, long execTime) {
8383

8484
if (exec) {
8585
while (!queue.isEmpty()) {
86-
if (innerSubscription.isUnsubscribed()) {
87-
return Subscriptions.empty();
88-
}
8986
queue.poll().action.call();
9087
}
9188

@@ -108,7 +105,6 @@ public void call() {
108105

109106
@Override
110107
public void unsubscribe() {
111-
QUEUE.set(null); // this assumes we are calling unsubscribe from the same thread
112108
innerSubscription.unsubscribe();
113109
}
114110

rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.assertTrue;
1920

21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
2024
import org.junit.Test;
2125

2226
import rx.Observable;
2327
import rx.Scheduler;
28+
import rx.Scheduler.Worker;
29+
import rx.functions.Action0;
2430
import rx.functions.Action1;
2531
import rx.functions.Func1;
2632

@@ -55,4 +61,62 @@ public void call(String t) {
5561
}
5662
});
5763
}
64+
65+
@Test
66+
public void testNestedTrampolineWithUnsubscribe() {
67+
final ArrayList<String> workDone = new ArrayList<String>();
68+
Worker worker = Schedulers.trampoline().createWorker();
69+
worker.schedule(new Action0() {
70+
71+
@Override
72+
public void call() {
73+
doWorkOnNewTrampoline("A", workDone);
74+
}
75+
76+
});
77+
78+
final Worker worker2 = Schedulers.trampoline().createWorker();
79+
worker2.schedule(new Action0() {
80+
81+
@Override
82+
public void call() {
83+
doWorkOnNewTrampoline("B", workDone);
84+
// we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker
85+
worker2.unsubscribe();
86+
}
87+
88+
});
89+
90+
assertEquals(6, workDone.size());
91+
assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone);
92+
}
93+
94+
private static void doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
95+
Worker worker = Schedulers.trampoline().createWorker();
96+
worker.schedule(new Action0() {
97+
98+
@Override
99+
public void call() {
100+
String msg = key + ".1";
101+
workDone.add(msg);
102+
System.out.println(msg);
103+
Worker worker3 = Schedulers.trampoline().createWorker();
104+
worker3.schedule(createPrintAction(key + ".B.1", workDone));
105+
worker3.schedule(createPrintAction(key + ".B.2", workDone));
106+
}
107+
108+
});
109+
}
110+
111+
private static Action0 createPrintAction(final String message, final ArrayList<String> workDone) {
112+
return new Action0() {
113+
114+
@Override
115+
public void call() {
116+
System.out.println(message);
117+
workDone.add(message);
118+
}
119+
120+
};
121+
}
58122
}

0 commit comments

Comments
 (0)