15
15
*/
16
16
package rx .internal .operators ;
17
17
18
+ import java .util .Queue ;
18
19
import java .util .concurrent .atomic .*;
19
20
20
21
import rx .Observable .Operator ;
21
22
import rx .*;
22
23
import rx .exceptions .MissingBackpressureException ;
23
24
import rx .functions .Action0 ;
24
- import rx .internal .util .RxRingBuffer ;
25
- import rx .internal .util .unsafe .SpscArrayQueue ;
25
+ import rx .internal .util .* ;
26
+ import rx .internal .util .unsafe .* ;
26
27
import rx .schedulers .*;
27
28
28
29
/**
@@ -64,7 +65,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
64
65
final ScheduledUnsubscribe scheduledUnsubscribe ;
65
66
final NotificationLite <T > on = NotificationLite .instance ();
66
67
67
- final SpscArrayQueue <Object > queue = new SpscArrayQueue < Object >( RxRingBuffer . SIZE ) ;
68
+ final Queue <Object > queue ;
68
69
volatile boolean completed = false ;
69
70
volatile boolean failure = false ;
70
71
@@ -84,6 +85,11 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
84
85
public ObserveOnSubscriber (Scheduler scheduler , Subscriber <? super T > child ) {
85
86
this .child = child ;
86
87
this .recursiveScheduler = scheduler .createWorker ();
88
+ if (UnsafeAccess .isUnsafeAvailable ()) {
89
+ queue = new SpscArrayQueue <Object >(RxRingBuffer .SIZE );
90
+ } else {
91
+ queue = new SynchronizedQueue <Object >(RxRingBuffer .SIZE );
92
+ }
87
93
this .scheduledUnsubscribe = new ScheduledUnsubscribe (recursiveScheduler );
88
94
child .add (scheduledUnsubscribe );
89
95
child .setProducer (new Producer () {
0 commit comments