@@ -68,7 +68,7 @@ static final class TimeoutTimedOtherSubscriber<T> implements FlowableSubscriber<
68
68
69
69
final FullArbiter <T > arbiter ;
70
70
71
- final AtomicReference < Disposable > timer = new AtomicReference < Disposable >() ;
71
+ Disposable timer ;
72
72
73
73
volatile long index ;
74
74
@@ -110,16 +110,11 @@ public void onNext(T t) {
110
110
}
111
111
112
112
void scheduleTimeout (final long idx ) {
113
- Disposable d = timer .get ();
114
- if (d != null ) {
115
- d .dispose ();
113
+ if (timer != null ) {
114
+ timer .dispose ();
116
115
}
117
116
118
- if (timer .compareAndSet (d , NEW_TIMER )) {
119
- d = worker .schedule (new TimeoutTask (idx ), timeout , unit );
120
-
121
- DisposableHelper .replace (timer , d );
122
- }
117
+ timer = worker .schedule (new TimeoutTask (idx ), timeout , unit );
123
118
}
124
119
125
120
void subscribeNext () {
@@ -170,11 +165,10 @@ public void run() {
170
165
if (idx == index ) {
171
166
done = true ;
172
167
s .cancel ();
173
- DisposableHelper .dispose (timer );
168
+ worker .dispose ();
174
169
175
170
subscribeNext ();
176
171
177
- worker .dispose ();
178
172
}
179
173
}
180
174
}
@@ -188,7 +182,7 @@ static final class TimeoutTimedSubscriber<T> implements FlowableSubscriber<T>, D
188
182
189
183
Subscription s ;
190
184
191
- final AtomicReference < Disposable > timer = new AtomicReference < Disposable >() ;
185
+ Disposable timer ;
192
186
193
187
volatile long index ;
194
188
@@ -224,16 +218,11 @@ public void onNext(T t) {
224
218
}
225
219
226
220
void scheduleTimeout (final long idx ) {
227
- Disposable d = timer .get ();
228
- if (d != null ) {
229
- d .dispose ();
221
+ if (timer != null ) {
222
+ timer .dispose ();
230
223
}
231
224
232
- if (timer .compareAndSet (d , NEW_TIMER )) {
233
- d = worker .schedule (new TimeoutTask (idx ), timeout , unit );
234
-
235
- DisposableHelper .replace (timer , d );
236
- }
225
+ timer = worker .schedule (new TimeoutTask (idx ), timeout , unit );
237
226
}
238
227
239
228
@ Override
0 commit comments