16
16
package rx .internal .operators ;
17
17
18
18
import java .util .Iterator ;
19
- import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
19
+ import java .util .concurrent .atomic .AtomicLong ;
20
20
21
+ import rx .*;
21
22
import rx .Observable .OnSubscribe ;
22
- import rx .Producer ;
23
- import rx .Subscriber ;
24
23
25
24
/**
26
25
* Converts an {@code Iterable} sequence into an {@code Observable}.
@@ -50,33 +49,54 @@ public void call(final Subscriber<? super T> o) {
50
49
o .setProducer (new IterableProducer <T >(o , it ));
51
50
}
52
51
53
- private static final class IterableProducer <T > implements Producer {
52
+ private static final class IterableProducer <T > extends AtomicLong implements Producer {
53
+ /** */
54
+ private static final long serialVersionUID = -8730475647105475802L ;
54
55
private final Subscriber <? super T > o ;
55
56
private final Iterator <? extends T > it ;
56
57
57
- private volatile long requested = 0 ;
58
- @ SuppressWarnings ("rawtypes" )
59
- private static final AtomicLongFieldUpdater <IterableProducer > REQUESTED_UPDATER = AtomicLongFieldUpdater .newUpdater (IterableProducer .class , "requested" );
60
-
61
58
private IterableProducer (Subscriber <? super T > o , Iterator <? extends T > it ) {
62
59
this .o = o ;
63
60
this .it = it ;
64
61
}
65
62
66
63
@ Override
67
64
public void request (long n ) {
68
- if (requested == Long .MAX_VALUE ) {
65
+ if (get () == Long .MAX_VALUE ) {
69
66
// already started with fast-path
70
67
return ;
71
68
}
72
- if (n == Long .MAX_VALUE && REQUESTED_UPDATER .compareAndSet (this , 0 , Long .MAX_VALUE )) {
73
- // fast-path without backpressure
69
+ if (n == Long .MAX_VALUE && compareAndSet (0 , Long .MAX_VALUE )) {
70
+ fastpath ();
71
+ } else
72
+ if (n > 0 && BackpressureUtils .getAndAddRequest (this , n ) == 0L ) {
73
+ slowpath (n );
74
+ }
75
+
76
+ }
77
+
78
+ void slowpath (long n ) {
79
+ // backpressure is requested
80
+ final Subscriber <? super T > o = this .o ;
81
+ final Iterator <? extends T > it = this .it ;
74
82
83
+ long r = n ;
84
+ while (true ) {
85
+ /*
86
+ * This complicated logic is done to avoid touching the
87
+ * volatile `requested` value during the loop itself. If
88
+ * it is touched during the loop the performance is
89
+ * impacted significantly.
90
+ */
91
+ long numToEmit = r ;
75
92
while (true ) {
76
93
if (o .isUnsubscribed ()) {
77
94
return ;
78
95
} else if (it .hasNext ()) {
79
- o .onNext (it .next ());
96
+ if (--numToEmit >= 0 ) {
97
+ o .onNext (it .next ());
98
+ } else
99
+ break ;
80
100
} else if (!o .isUnsubscribed ()) {
81
101
o .onCompleted ();
82
102
return ;
@@ -85,45 +105,34 @@ public void request(long n) {
85
105
return ;
86
106
}
87
107
}
88
- } else if (n > 0 ) {
89
- // backpressure is requested
90
- long _c = BackpressureUtils .getAndAddRequest (REQUESTED_UPDATER , this , n );
91
- if (_c == 0 ) {
92
- while (true ) {
93
- /*
94
- * This complicated logic is done to avoid touching the
95
- * volatile `requested` value during the loop itself. If
96
- * it is touched during the loop the performance is
97
- * impacted significantly.
98
- */
99
- long r = requested ;
100
- long numToEmit = r ;
101
- while (true ) {
102
- if (o .isUnsubscribed ()) {
103
- return ;
104
- } else if (it .hasNext ()) {
105
- if (--numToEmit >= 0 ) {
106
- o .onNext (it .next ());
107
- } else
108
- break ;
109
- } else if (!o .isUnsubscribed ()) {
110
- o .onCompleted ();
111
- return ;
112
- } else {
113
- // is unsubscribed
114
- return ;
115
- }
116
- }
117
- if (REQUESTED_UPDATER .addAndGet (this , -r ) == 0 ) {
118
- // we're done emitting the number requested so
119
- // return
120
- return ;
121
- }
122
-
123
- }
108
+ r = addAndGet (-r );
109
+ if (r == 0L ) {
110
+ // we're done emitting the number requested so
111
+ // return
112
+ return ;
124
113
}
114
+
125
115
}
116
+ }
126
117
118
+ void fastpath () {
119
+ // fast-path without backpressure
120
+ final Subscriber <? super T > o = this .o ;
121
+ final Iterator <? extends T > it = this .it ;
122
+
123
+ while (true ) {
124
+ if (o .isUnsubscribed ()) {
125
+ return ;
126
+ } else if (it .hasNext ()) {
127
+ o .onNext (it .next ());
128
+ } else if (!o .isUnsubscribed ()) {
129
+ o .onCompleted ();
130
+ return ;
131
+ } else {
132
+ // is unsubscribed
133
+ return ;
134
+ }
135
+ }
127
136
}
128
137
}
129
138
0 commit comments