Skip to content

Commit 1175b07

Browse files
committed
Reworked tracking structures.
1 parent 8614754 commit 1175b07

File tree

5 files changed

+176
-70
lines changed

5 files changed

+176
-70
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18+
import java.util.concurrent.atomic.*;
2019

2120
import rx.Observable.Operator;
22-
import rx.Producer;
23-
import rx.Scheduler;
24-
import rx.Subscriber;
25-
import rx.Subscription;
21+
import rx.*;
2622
import rx.exceptions.MissingBackpressureException;
2723
import rx.functions.Action0;
2824
import rx.internal.util.RxRingBuffer;
29-
import rx.schedulers.ImmediateScheduler;
30-
import rx.schedulers.TrampolineScheduler;
25+
import rx.internal.util.unsafe.SpscArrayQueue;
26+
import rx.schedulers.*;
3127

3228
/**
3329
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
@@ -64,16 +60,15 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
6460
/** Observe through individual queue per observer. */
6561
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
6662
final Subscriber<? super T> child;
67-
private final Scheduler.Worker recursiveScheduler;
68-
private final ScheduledUnsubscribe scheduledUnsubscribe;
63+
final Scheduler.Worker recursiveScheduler;
64+
final ScheduledUnsubscribe scheduledUnsubscribe;
6965
final NotificationLite<T> on = NotificationLite.instance();
7066

71-
private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
72-
private boolean completed = false;
73-
private boolean failure = false;
67+
final SpscArrayQueue<Object> queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
68+
volatile boolean completed = false;
69+
volatile boolean failure = false;
7470

75-
@SuppressWarnings("unused")
76-
private volatile long requested = 0;
71+
volatile long requested = 0;
7772
@SuppressWarnings("rawtypes")
7873
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
7974

@@ -82,12 +77,14 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
8277
@SuppressWarnings("rawtypes")
8378
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8479

80+
volatile Throwable error;
81+
8582
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
8683
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
8784
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
8885
this.child = child;
8986
this.recursiveScheduler = scheduler.createWorker();
90-
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
87+
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
9188
child.add(scheduledUnsubscribe);
9289
child.setProducer(new Producer() {
9390

@@ -113,10 +110,8 @@ public void onNext(final T t) {
113110
if (isUnsubscribed() || completed) {
114111
return;
115112
}
116-
try {
117-
queue.onNext(t);
118-
} catch (MissingBackpressureException e) {
119-
onError(e);
113+
if (!queue.offer(on.next(t))) {
114+
onError(new MissingBackpressureException());
120115
return;
121116
}
122117
schedule();
@@ -127,8 +122,10 @@ public void onCompleted() {
127122
if (isUnsubscribed() || completed) {
128123
return;
129124
}
125+
if (error != null) {
126+
return;
127+
}
130128
completed = true;
131-
queue.onCompleted();
132129
schedule();
133130
}
134131

@@ -137,53 +134,64 @@ public void onError(final Throwable e) {
137134
if (isUnsubscribed() || completed) {
138135
return;
139136
}
137+
if (error != null) {
138+
return;
139+
}
140+
error = e;
140141
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
141142
unsubscribe();
142-
completed = true;
143143
// mark failure so the polling thread will skip onNext still in the queue
144+
completed = true;
144145
failure = true;
145-
queue.onError(e);
146146
schedule();
147147
}
148148

149-
protected void schedule() {
150-
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
151-
recursiveScheduler.schedule(new Action0() {
149+
final Action0 action = new Action0() {
152150

153-
@Override
154-
public void call() {
155-
pollQueue();
156-
}
151+
@Override
152+
public void call() {
153+
pollQueue();
154+
}
157155

158-
});
156+
};
157+
158+
protected void schedule() {
159+
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
160+
recursiveScheduler.schedule(action);
159161
}
160162
}
161163

162164
// only execute this from schedule()
163-
private void pollQueue() {
165+
void pollQueue() {
164166
int emitted = 0;
165167
do {
166168
/*
167169
* Set to 1 otherwise it could have grown very large while in the last poll loop
168170
* and then we can end up looping all those times again here before exiting even once we've drained
169171
*/
170-
COUNTER_UPDATER.set(this, 1);
172+
counter = 1;
171173

174+
// middle:
172175
while (!scheduledUnsubscribe.isUnsubscribed()) {
173176
if (failure) {
174-
// special handling to short-circuit an error propagation
175-
Object o = queue.poll();
176-
// completed so we will skip onNext if they exist and only emit terminal events
177-
if (on.isError(o)) {
178-
// only emit error
179-
on.accept(child, o);
180-
// we have emitted a terminal event so return (exit the loop we're in)
177+
child.onError(error);
178+
return;
179+
} else {
180+
if (requested == 0 && completed && queue.isEmpty()) {
181+
child.onCompleted();
181182
return;
182183
}
183-
} else {
184184
if (REQUESTED.getAndDecrement(this) != 0) {
185185
Object o = queue.poll();
186186
if (o == null) {
187+
if (completed) {
188+
if (failure) {
189+
child.onError(error);
190+
} else {
191+
child.onCompleted();
192+
}
193+
return;
194+
}
187195
// nothing in queue
188196
REQUESTED.incrementAndGet(this);
189197
break;
@@ -213,12 +221,10 @@ static final class ScheduledUnsubscribe implements Subscription {
213221
final Scheduler.Worker worker;
214222
volatile int once;
215223
static final AtomicIntegerFieldUpdater<ScheduledUnsubscribe> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once");
216-
final RxRingBuffer queue;
217224
volatile boolean unsubscribed = false;
218225

219-
public ScheduledUnsubscribe(Scheduler.Worker worker, RxRingBuffer queue) {
226+
public ScheduledUnsubscribe(Scheduler.Worker worker) {
220227
this.worker = worker;
221-
this.queue = queue;
222228
}
223229

224230
@Override

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
*/
1616
package rx.internal.schedulers;
1717

18-
import rx.Scheduler;
19-
import rx.Subscription;
20-
import rx.functions.Action0;
21-
import rx.internal.util.RxThreadFactory;
22-
import rx.subscriptions.CompositeSubscription;
23-
import rx.subscriptions.Subscriptions;
18+
import java.util.concurrent.*;
2419

25-
import java.util.concurrent.ThreadFactory;
26-
import java.util.concurrent.TimeUnit;
20+
import rx.*;
21+
import rx.functions.Action0;
22+
import rx.internal.util.*;
23+
import rx.subscriptions.*;
2724

2825
public class EventLoopsScheduler extends Scheduler {
2926
/** Manages a fixed number of workers. */
@@ -95,7 +92,9 @@ public Subscription scheduleDirect(Action0 action) {
9592
}
9693

9794
private static class EventLoopWorker extends Scheduler.Worker {
98-
private final CompositeSubscription innerSubscription = new CompositeSubscription();
95+
private final SubscriptionList serial = new SubscriptionList();
96+
private final CompositeSubscription timed = new CompositeSubscription();
97+
private final SubscriptionList both = new SubscriptionList(serial, timed);
9998
private final PoolWorker poolWorker;
10099

101100
EventLoopWorker(PoolWorker poolWorker) {
@@ -105,28 +104,33 @@ private static class EventLoopWorker extends Scheduler.Worker {
105104

106105
@Override
107106
public void unsubscribe() {
108-
innerSubscription.unsubscribe();
107+
both.unsubscribe();
109108
}
110109

111110
@Override
112111
public boolean isUnsubscribed() {
113-
return innerSubscription.isUnsubscribed();
112+
return both.isUnsubscribed();
114113
}
115114

116115
@Override
117116
public Subscription schedule(Action0 action) {
118-
return schedule(action, 0, null);
117+
if (isUnsubscribed()) {
118+
return Subscriptions.unsubscribed();
119+
}
120+
ScheduledAction s = poolWorker.scheduleActual(action, 0, null);
121+
122+
serial.add(s);
123+
s.addParent(serial);
124+
125+
return s;
119126
}
120127
@Override
121128
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
122-
if (innerSubscription.isUnsubscribed()) {
123-
// don't schedule, we are unsubscribed
129+
if (isUnsubscribed()) {
124130
return Subscriptions.unsubscribed();
125131
}
132+
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, timed);
126133

127-
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
128-
innerSubscription.add(s);
129-
s.addParent(innerSubscription);
130134
return s;
131135
}
132136
}

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import rx.*;
2424
import rx.exceptions.Exceptions;
2525
import rx.functions.Action0;
26-
import rx.internal.util.RxThreadFactory;
26+
import rx.internal.util.*;
2727
import rx.plugins.*;
28-
import rx.subscriptions.Subscriptions;
28+
import rx.subscriptions.*;
2929

3030
/**
3131
* @warn class description missing
@@ -174,6 +174,37 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
174174

175175
return run;
176176
}
177+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
178+
Action0 decoratedAction = schedulersHook.onSchedule(action);
179+
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
180+
parent.add(run);
181+
182+
Future<?> f;
183+
if (delayTime <= 0) {
184+
f = executor.submit(run);
185+
} else {
186+
f = executor.schedule(run, delayTime, unit);
187+
}
188+
run.add(f);
189+
190+
return run;
191+
}
192+
193+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
194+
Action0 decoratedAction = schedulersHook.onSchedule(action);
195+
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
196+
parent.add(run);
197+
198+
Future<?> f;
199+
if (delayTime <= 0) {
200+
f = executor.submit(run);
201+
} else {
202+
f = executor.schedule(run, delayTime, unit);
203+
}
204+
run.add(f);
205+
206+
return run;
207+
}
177208

178209
@Override
179210
public void unsubscribe() {

0 commit comments

Comments
 (0)