Skip to content

Commit 539a88e

Browse files
committed
wrapping each of the onNext/onCompleted to ensure no overlapping calls
1 parent 067afbe commit 539a88e

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

src/main/java/rx/schedulers/ScheduleWhen.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ public void unsubscribe() {
117117
// room for the next worker in the workerQueue.
118118
if (unsubscribed.compareAndSet(false, true)) {
119119
actualWorker.unsubscribe();
120-
actionQueue.onCompleted();
120+
synchronized (actionQueue) {
121+
actionQueue.onCompleted();
122+
}
121123
}
122124
}
123125

@@ -130,21 +132,27 @@ public boolean isUnsubscribed() {
130132
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
131133
// send a scheduled action to the actionQueue
132134
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
133-
actionQueue.onNext(delayedAction);
135+
synchronized (actionQueue) {
136+
actionQueue.onNext(delayedAction);
137+
}
134138
return delayedAction;
135139
}
136140

137141
@Override
138142
public Subscription schedule(final Action0 action) {
139143
// send a scheduled action to the actionQueue
140144
ImmediateAction immediateAction = new ImmediateAction(action);
141-
actionQueue.onNext(immediateAction);
145+
synchronized (actionQueue) {
146+
actionQueue.onNext(immediateAction);
147+
}
142148
return immediateAction;
143149
}
144150
};
145151

146152
// enqueue the completable that process actions put in reply subject
147-
workerQueue.onNext(actions);
153+
synchronized (workerQueue) {
154+
workerQueue.onNext(actions);
155+
}
148156

149157
// return the worker that adds actions to the reply subject
150158
return worker;

0 commit comments

Comments
 (0)