24
24
import rx .Producer ;
25
25
import rx .Subscriber ;
26
26
import rx .functions .Action0 ;
27
+ import rx .internal .producers .ProducerArbiter ;
27
28
import rx .observers .SerializedSubscriber ;
28
29
import rx .subscriptions .SerialSubscription ;
29
30
import rx .subscriptions .Subscriptions ;
@@ -85,17 +86,19 @@ static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T
85
86
86
87
volatile int wip ;
87
88
@ SuppressWarnings ("rawtypes" )
88
- static final AtomicIntegerFieldUpdater <ConcatSubscriber > WIP_UPDATER = AtomicIntegerFieldUpdater .newUpdater (ConcatSubscriber .class , "wip" );
89
+ static final AtomicIntegerFieldUpdater <ConcatSubscriber > WIP = AtomicIntegerFieldUpdater .newUpdater (ConcatSubscriber .class , "wip" );
89
90
90
- // accessed by REQUESTED_UPDATER
91
+ // accessed by REQUESTED
91
92
private volatile long requested ;
92
93
@ SuppressWarnings ("rawtypes" )
93
- private static final AtomicLongFieldUpdater <ConcatSubscriber > REQUESTED_UPDATER = AtomicLongFieldUpdater .newUpdater (ConcatSubscriber .class , "requested" );
94
+ private static final AtomicLongFieldUpdater <ConcatSubscriber > REQUESTED = AtomicLongFieldUpdater .newUpdater (ConcatSubscriber .class , "requested" );
95
+ private final ProducerArbiter arbiter ;
94
96
95
97
public ConcatSubscriber (Subscriber <T > s , SerialSubscription current ) {
96
98
super (s );
97
99
this .child = s ;
98
100
this .current = current ;
101
+ this .arbiter = new ProducerArbiter ();
99
102
this .queue = new ConcurrentLinkedQueue <Object >();
100
103
add (Subscriptions .create (new Action0 () {
101
104
@ Override
@@ -113,32 +116,27 @@ public void onStart() {
113
116
}
114
117
115
118
private void requestFromChild (long n ) {
119
+ if (n <=0 ) return ;
116
120
// we track 'requested' so we know whether we should subscribe the next or not
117
- ConcatInnerSubscriber <T > actualSubscriber = currentSubscriber ;
118
- if (n > 0 && BackpressureUtils .getAndAddRequest (REQUESTED_UPDATER , this , n ) == 0 ) {
119
- if (actualSubscriber == null && wip > 0 ) {
121
+ long previous = BackpressureUtils .getAndAddRequest (REQUESTED , this , n );
122
+ arbiter .request (n );
123
+ if (previous == 0 ) {
124
+ if (currentSubscriber == null && wip > 0 ) {
120
125
// this means we may be moving from one subscriber to another after having stopped processing
121
126
// so need to kick off the subscribe via this request notification
122
127
subscribeNext ();
123
- // return here as we don't want to do the requestMore logic below (which would double request)
124
- return ;
125
128
}
126
129
}
127
-
128
- if (actualSubscriber != null ) {
129
- // otherwise we are just passing it through to the currentSubscriber
130
- actualSubscriber .requestMore (n );
131
- }
132
130
}
133
131
134
132
private void decrementRequested () {
135
- REQUESTED_UPDATER .decrementAndGet (this );
133
+ REQUESTED .decrementAndGet (this );
136
134
}
137
135
138
136
@ Override
139
137
public void onNext (Observable <? extends T > t ) {
140
138
queue .add (nl .next (t ));
141
- if (WIP_UPDATER .getAndIncrement (this ) == 0 ) {
139
+ if (WIP .getAndIncrement (this ) == 0 ) {
142
140
subscribeNext ();
143
141
}
144
142
}
@@ -152,14 +150,15 @@ public void onError(Throwable e) {
152
150
@ Override
153
151
public void onCompleted () {
154
152
queue .add (nl .completed ());
155
- if (WIP_UPDATER .getAndIncrement (this ) == 0 ) {
153
+ if (WIP .getAndIncrement (this ) == 0 ) {
156
154
subscribeNext ();
157
155
}
158
156
}
157
+
159
158
160
159
void completeInner () {
161
160
currentSubscriber = null ;
162
- if (WIP_UPDATER .decrementAndGet (this ) > 0 ) {
161
+ if (WIP .decrementAndGet (this ) > 0 ) {
163
162
subscribeNext ();
164
163
}
165
164
request (1 );
@@ -172,7 +171,7 @@ void subscribeNext() {
172
171
child .onCompleted ();
173
172
} else if (o != null ) {
174
173
Observable <? extends T > obs = nl .getValue (o );
175
- currentSubscriber = new ConcatInnerSubscriber <T >(this , child , requested );
174
+ currentSubscriber = new ConcatInnerSubscriber <T >(this , child , arbiter );
176
175
current .set (currentSubscriber );
177
176
obs .unsafeSubscribe (currentSubscriber );
178
177
}
@@ -193,39 +192,42 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
193
192
@ SuppressWarnings ("unused" )
194
193
private volatile int once = 0 ;
195
194
@ SuppressWarnings ("rawtypes" )
196
- private final static AtomicIntegerFieldUpdater <ConcatInnerSubscriber > ONCE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (ConcatInnerSubscriber .class , "once" );
195
+ private final static AtomicIntegerFieldUpdater <ConcatInnerSubscriber > ONCE = AtomicIntegerFieldUpdater .newUpdater (ConcatInnerSubscriber .class , "once" );
196
+ private final ProducerArbiter arbiter ;
197
197
198
- public ConcatInnerSubscriber (ConcatSubscriber <T > parent , Subscriber <T > child , long initialRequest ) {
198
+ public ConcatInnerSubscriber (ConcatSubscriber <T > parent , Subscriber <T > child , ProducerArbiter arbiter ) {
199
199
this .parent = parent ;
200
200
this .child = child ;
201
- request (initialRequest );
202
- }
203
-
204
- void requestMore (long n ) {
205
- request (n );
201
+ this .arbiter = arbiter ;
206
202
}
207
-
203
+
208
204
@ Override
209
205
public void onNext (T t ) {
210
- parent .decrementRequested ();
211
206
child .onNext (t );
207
+ parent .decrementRequested ();
208
+ arbiter .produced (1 );
212
209
}
213
210
214
211
@ Override
215
212
public void onError (Throwable e ) {
216
- if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
213
+ if (ONCE .compareAndSet (this , 0 , 1 )) {
217
214
// terminal error through parent so everything gets cleaned up, including this inner
218
215
parent .onError (e );
219
216
}
220
217
}
221
218
222
219
@ Override
223
220
public void onCompleted () {
224
- if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
221
+ if (ONCE .compareAndSet (this , 0 , 1 )) {
225
222
// terminal completion to parent so it continues to the next
226
223
parent .completeInner ();
227
224
}
228
225
}
226
+
227
+ @ Override
228
+ public void setProducer (Producer producer ) {
229
+ arbiter .setProducer (producer );
230
+ }
229
231
230
232
};
231
233
}
0 commit comments