16
16
package rx .internal .operators ;
17
17
18
18
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
19
+
19
20
import rx .Observable ;
21
+ import rx .Producer ;
20
22
import rx .Scheduler ;
21
23
import rx .Subscriber ;
22
24
import rx .functions .Action0 ;
23
25
import rx .functions .Func2 ;
26
+ import rx .internal .producers .ProducerArbiter ;
24
27
import rx .schedulers .Schedulers ;
25
28
import rx .subscriptions .SerialSubscription ;
26
29
@@ -38,88 +41,99 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child)
38
41
final SerialSubscription serialSubscription = new SerialSubscription ();
39
42
// add serialSubscription so it gets unsubscribed if child is unsubscribed
40
43
child .add (serialSubscription );
41
-
42
- return new SourceSubscriber <T >(child , predicate , inner , serialSubscription );
44
+ ProducerArbiter pa = new ProducerArbiter ();
45
+ child .setProducer (pa );
46
+ return new SourceSubscriber <T >(child , predicate , inner , serialSubscription , pa );
43
47
}
44
48
45
49
static final class SourceSubscriber <T > extends Subscriber <Observable <T >> {
46
50
final Subscriber <? super T > child ;
47
51
final Func2 <Integer , Throwable , Boolean > predicate ;
48
52
final Scheduler .Worker inner ;
49
53
final SerialSubscription serialSubscription ;
54
+ final ProducerArbiter pa ;
50
55
51
56
volatile int attempts ;
52
57
@ SuppressWarnings ("rawtypes" )
53
58
static final AtomicIntegerFieldUpdater <SourceSubscriber > ATTEMPTS_UPDATER
54
59
= AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "attempts" );
55
60
56
- public SourceSubscriber (Subscriber <? super T > child , final Func2 <Integer , Throwable , Boolean > predicate , Scheduler .Worker inner ,
57
- SerialSubscription serialSubscription ) {
61
+ public SourceSubscriber (Subscriber <? super T > child ,
62
+ final Func2 <Integer , Throwable , Boolean > predicate ,
63
+ Scheduler .Worker inner ,
64
+ SerialSubscription serialSubscription ,
65
+ ProducerArbiter pa ) {
58
66
this .child = child ;
59
67
this .predicate = predicate ;
60
68
this .inner = inner ;
61
69
this .serialSubscription = serialSubscription ;
70
+ this .pa = pa ;
62
71
}
63
72
64
73
65
74
@ Override
66
- public void onCompleted () {
67
- // ignore as we expect a single nested Observable<T>
68
- }
75
+ public void onCompleted () {
76
+ // ignore as we expect a single nested Observable<T>
77
+ }
69
78
70
- @ Override
71
- public void onError (Throwable e ) {
72
- child .onError (e );
73
- }
79
+ @ Override
80
+ public void onError (Throwable e ) {
81
+ child .onError (e );
82
+ }
74
83
75
- @ Override
76
- public void onNext (final Observable <T > o ) {
77
- inner .schedule (new Action0 () {
84
+ @ Override
85
+ public void onNext (final Observable <T > o ) {
86
+ inner .schedule (new Action0 () {
78
87
79
- @ Override
80
- public void call () {
81
- final Action0 _self = this ;
82
- ATTEMPTS_UPDATER .incrementAndGet (SourceSubscriber .this );
88
+ @ Override
89
+ public void call () {
90
+ final Action0 _self = this ;
91
+ ATTEMPTS_UPDATER .incrementAndGet (SourceSubscriber .this );
83
92
84
- // new subscription each time so if it unsubscribes itself it does not prevent retries
85
- // by unsubscribing the child subscription
86
- Subscriber <T > subscriber = new Subscriber <T >() {
87
- boolean done ;
88
- @ Override
89
- public void onCompleted () {
90
- if (!done ) {
91
- done = true ;
92
- child .onCompleted ();
93
- }
93
+ // new subscription each time so if it unsubscribes itself it does not prevent retries
94
+ // by unsubscribing the child subscription
95
+ Subscriber <T > subscriber = new Subscriber <T >() {
96
+ boolean done ;
97
+ @ Override
98
+ public void onCompleted () {
99
+ if (!done ) {
100
+ done = true ;
101
+ child .onCompleted ();
94
102
}
103
+ }
95
104
96
- @ Override
97
- public void onError (Throwable e ) {
98
- if (!done ) {
99
- done = true ;
100
- if (predicate .call (attempts , e ) && !inner .isUnsubscribed ()) {
101
- // retry again
102
- inner .schedule (_self );
103
- } else {
104
- // give up and pass the failure
105
- child .onError (e );
106
- }
105
+ @ Override
106
+ public void onError (Throwable e ) {
107
+ if (!done ) {
108
+ done = true ;
109
+ if (predicate .call (attempts , e ) && !inner .isUnsubscribed ()) {
110
+ // retry again
111
+ inner .schedule (_self );
112
+ } else {
113
+ // give up and pass the failure
114
+ child .onError (e );
107
115
}
108
116
}
117
+ }
109
118
110
- @ Override
111
- public void onNext (T v ) {
112
- if (!done ) {
113
- child .onNext (v );
114
- }
119
+ @ Override
120
+ public void onNext (T v ) {
121
+ if (!done ) {
122
+ child .onNext (v );
123
+ pa . produced ( 1 );
115
124
}
125
+ }
116
126
117
- };
118
- // register this Subscription (and unsubscribe previous if exists)
119
- serialSubscription .set (subscriber );
120
- o .unsafeSubscribe (subscriber );
121
- }
122
- });
123
- }
127
+ @ Override
128
+ public void setProducer (Producer p ) {
129
+ pa .setProducer (p );
130
+ }
131
+ };
132
+ // register this Subscription (and unsubscribe previous if exists)
133
+ serialSubscription .set (subscriber );
134
+ o .unsafeSubscribe (subscriber );
135
+ }
136
+ });
137
+ }
124
138
}
125
139
}
0 commit comments