16
16
package rx .internal .operators ;
17
17
18
18
import java .util .concurrent .TimeUnit ;
19
- import java .util .concurrent .atomic .AtomicBoolean ;
20
19
21
- import rx .*;
22
20
import rx .Observable .Operator ;
21
+ import rx .Scheduler ;
23
22
import rx .Scheduler .Worker ;
23
+ import rx .Subscriber ;
24
24
import rx .functions .Action0 ;
25
25
26
26
/**
@@ -42,44 +42,48 @@ public OperatorSkipTimed(long time, TimeUnit unit, Scheduler scheduler) {
42
42
public Subscriber <? super T > call (final Subscriber <? super T > child ) {
43
43
final Worker worker = scheduler .createWorker ();
44
44
child .add (worker );
45
- final AtomicBooleanAction gate = new AtomicBooleanAction ();
46
- worker .schedule (gate , time , unit );
47
- return new Subscriber <T >(child ) {
45
+ SkipTimedSubscriber <T > subscriber = new SkipTimedSubscriber <T >(child );
46
+ worker .schedule (subscriber , time , unit );
47
+ return subscriber ;
48
+ }
49
+
50
+ final static class SkipTimedSubscriber <T > extends Subscriber <T > implements Action0 {
48
51
49
- @ Override
50
- public void onNext (T t ) {
51
- if (gate .get ()) {
52
- child .onNext (t );
53
- }
54
- }
52
+ Subscriber <? super T > child ;
53
+ volatile boolean gate ;
55
54
56
- @ Override
57
- public void onError (Throwable e ) {
58
- try {
59
- child .onError (e );
60
- } finally {
61
- unsubscribe ();
62
- }
63
- }
55
+ SkipTimedSubscriber (Subscriber <? super T > child ) {
56
+ this .child = child ;
57
+ }
64
58
65
- @ Override
66
- public void onCompleted () {
67
- try {
68
- child .onCompleted ();
69
- } finally {
70
- unsubscribe ();
71
- }
59
+ @ Override
60
+ public void call () {
61
+ gate = true ;
62
+ }
63
+
64
+ @ Override
65
+ public void onNext (T t ) {
66
+ if (gate ) {
67
+ child .onNext (t );
72
68
}
73
- };
74
- }
75
-
76
- final static class AtomicBooleanAction extends AtomicBoolean implements Action0 {
69
+ }
77
70
78
- private static final long serialVersionUID = 8308944559386967982L ;
71
+ @ Override
72
+ public void onError (Throwable e ) {
73
+ try {
74
+ child .onError (e );
75
+ } finally {
76
+ unsubscribe ();
77
+ }
78
+ }
79
79
80
80
@ Override
81
- public void call () {
82
- set (true );
81
+ public void onCompleted () {
82
+ try {
83
+ child .onCompleted ();
84
+ } finally {
85
+ unsubscribe ();
86
+ }
83
87
}
84
88
85
89
}
0 commit comments