|
20 | 20 |
|
21 | 21 | import java.util.concurrent.Executors;
|
22 | 22 | import java.util.concurrent.TimeUnit;
|
23 |
| -import java.util.concurrent.atomic.AtomicBoolean; |
24 | 23 |
|
25 | 24 | import org.junit.Before;
|
26 | 25 | import org.junit.Test;
|
@@ -57,47 +56,35 @@ public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUn
|
57 | 56 | }
|
58 | 57 |
|
59 | 58 | private static class Interval implements Func1<Observer<Long>, Subscription> {
|
60 |
| - private final long interval; |
| 59 | + private final long period; |
61 | 60 | private final TimeUnit unit;
|
62 | 61 | private final Scheduler scheduler;
|
63 | 62 |
|
64 | 63 | private long currentValue;
|
65 |
| - private final AtomicBoolean complete = new AtomicBoolean(); |
66 | 64 |
|
67 |
| - private Interval(long interval, TimeUnit unit, Scheduler scheduler) { |
68 |
| - this.interval = interval; |
| 65 | + private Interval(long period, TimeUnit unit, Scheduler scheduler) { |
| 66 | + this.period = period; |
69 | 67 | this.unit = unit;
|
70 | 68 | this.scheduler = scheduler;
|
71 | 69 | }
|
72 | 70 |
|
73 | 71 | @Override
|
74 | 72 | public Subscription call(final Observer<Long> observer) {
|
75 |
| - scheduler.schedule(new IntervalAction(observer), interval, unit); |
76 |
| - return Subscriptions.create(new Action0() { |
| 73 | + final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { |
77 | 74 | @Override
|
78 | 75 | public void call() {
|
79 |
| - complete.set(true); |
| 76 | + observer.onNext(currentValue); |
| 77 | + currentValue++; |
80 | 78 | }
|
81 |
| - }); |
82 |
| - } |
83 |
| - |
84 |
| - private class IntervalAction implements Action0 { |
85 |
| - private final Observer<Long> observer; |
86 |
| - |
87 |
| - private IntervalAction(Observer<Long> observer) { |
88 |
| - this.observer = observer; |
89 |
| - } |
| 79 | + }, period, period, unit); |
90 | 80 |
|
91 |
| - @Override |
92 |
| - public void call() { |
93 |
| - if (complete.get()) { |
| 81 | + return Subscriptions.create(new Action0() { |
| 82 | + @Override |
| 83 | + public void call() { |
| 84 | + wrapped.unsubscribe(); |
94 | 85 | observer.onCompleted();
|
95 |
| - } else { |
96 |
| - observer.onNext(currentValue); |
97 |
| - currentValue++; |
98 |
| - scheduler.schedule(this, interval, unit); |
99 | 86 | }
|
100 |
| - } |
| 87 | + }); |
101 | 88 | }
|
102 | 89 | }
|
103 | 90 |
|
|
0 commit comments