15
15
*/
16
16
package rx .schedulers ;
17
17
18
- import java .util .PriorityQueue ;
18
+ import java .util .concurrent . PriorityBlockingQueue ;
19
19
import java .util .concurrent .TimeUnit ;
20
20
import java .util .concurrent .atomic .AtomicInteger ;
21
21
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
@@ -45,14 +45,12 @@ public Worker createWorker() {
45
45
/* package accessible for unit tests */ TrampolineScheduler () {
46
46
}
47
47
48
- volatile int counter ;
49
- static final AtomicIntegerFieldUpdater <TrampolineScheduler > COUNTER_UPDATER = AtomicIntegerFieldUpdater .newUpdater (TrampolineScheduler .class , "counter" );
50
-
51
48
private class InnerCurrentThreadScheduler extends Scheduler .Worker implements Subscription {
52
49
53
- final PriorityQueue <TimedAction > queue = new PriorityQueue <TimedAction >();
50
+ private final PriorityBlockingQueue <TimedAction > queue = new PriorityBlockingQueue <TimedAction >();
54
51
private final BooleanSubscription innerSubscription = new BooleanSubscription ();
55
52
private final AtomicInteger wip = new AtomicInteger ();
53
+ private final AtomicInteger counter = new AtomicInteger ();
56
54
57
55
@ Override
58
56
public Subscription schedule (Action0 action ) {
@@ -70,18 +68,15 @@ private Subscription enqueue(Action0 action, long execTime) {
70
68
if (innerSubscription .isUnsubscribed ()) {
71
69
return Subscriptions .unsubscribed ();
72
70
}
73
- final TimedAction timedAction = new TimedAction (action , execTime , COUNTER_UPDATER .incrementAndGet (TrampolineScheduler .this ));
74
- synchronized (queue ) {
75
- queue .add (timedAction );
76
- }
71
+ final TimedAction timedAction = new TimedAction (action , execTime , counter .incrementAndGet ());
72
+ queue .add (timedAction );
77
73
78
74
if (wip .getAndIncrement () == 0 ) {
79
75
do {
80
- TimedAction polled ;
81
- synchronized ( queue ) {
82
- polled = queue . poll ();
76
+ final TimedAction polled = queue . poll () ;
77
+ if ( polled != null ) {
78
+ polled . action . call ();
83
79
}
84
- polled .action .call ();
85
80
} while (wip .decrementAndGet () > 0 );
86
81
return Subscriptions .unsubscribed ();
87
82
} else {
@@ -90,9 +85,7 @@ private Subscription enqueue(Action0 action, long execTime) {
90
85
91
86
@ Override
92
87
public void call () {
93
- synchronized (queue ) {
94
- queue .remove (timedAction );
95
- }
88
+ queue .remove (timedAction );
96
89
}
97
90
98
91
});
0 commit comments