Skip to content

Commit 16af514

Browse files
committed
Standard producers and some additional platform-safe queues.
1 parent 1a85656 commit 16af514

18 files changed

+2128
-94
lines changed

src/main/java/rx/internal/operators/OperatorToObservableList.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable.Operator;
19-
import rx.Subscriber;
18+
import java.util.*;
2019

21-
import java.util.ArrayList;
22-
import java.util.LinkedList;
23-
import java.util.List;
20+
import rx.Observable.Operator;
21+
import rx.*;
22+
import rx.internal.producers.SingleDelayedProducer;
2423

2524
/**
2625
* Returns an {@code Observable} that emits a single item, a list composed of all the items emitted by the
@@ -90,7 +89,7 @@ public void onCompleted() {
9089
return;
9190
}
9291
list = null;
93-
producer.set(result);
92+
producer.setValue(result);
9493
}
9594
}
9695

src/main/java/rx/internal/operators/OperatorToObservableSortedList.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable.Operator;
2121
import rx.*;
2222
import rx.functions.Func2;
23+
import rx.internal.producers.SingleDelayedProducer;
2324

2425
/**
2526
* Return an {@code Observable} that emits the items emitted by the source {@code Observable}, in a sorted order
@@ -77,7 +78,7 @@ public void onCompleted() {
7778
onError(e);
7879
return;
7980
}
80-
producer.set(a);
81+
producer.setValue(a);
8182
}
8283
}
8384

src/main/java/rx/internal/operators/SingleDelayedProducer.java

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.producers;
17+
18+
import rx.*;
19+
20+
/**
21+
* Producer that allows changing an underlying producer atomically and correctly resume with the accumulated
22+
* requests.
23+
*/
24+
public final class ProducerArbiter implements Producer {
25+
long requested;
26+
Producer currentProducer;
27+
28+
boolean emitting;
29+
long missedRequested;
30+
long missedProduced;
31+
Producer missedProducer;
32+
33+
static final Producer NULL_PRODUCER = new Producer() {
34+
@Override
35+
public void request(long n) {
36+
37+
}
38+
};
39+
40+
@Override
41+
public void request(long n) {
42+
if (n < 0) {
43+
throw new IllegalArgumentException("n >= 0 required");
44+
}
45+
if (n == 0) {
46+
return;
47+
}
48+
synchronized (this) {
49+
if (emitting) {
50+
missedRequested += n;
51+
return;
52+
}
53+
emitting = true;
54+
}
55+
boolean skipFinal = false;
56+
try {
57+
long r = requested;
58+
long u = r + n;
59+
if (u < 0) {
60+
u = Long.MAX_VALUE;
61+
}
62+
requested = u;
63+
64+
Producer p = currentProducer;
65+
if (p != null) {
66+
p.request(n);
67+
}
68+
69+
emitLoop();
70+
skipFinal = true;
71+
} finally {
72+
if (!skipFinal) {
73+
synchronized (this) {
74+
emitting = false;
75+
}
76+
}
77+
}
78+
}
79+
80+
public void produced(long n) {
81+
if (n <= 0) {
82+
throw new IllegalArgumentException("n > 0 required");
83+
}
84+
synchronized (this) {
85+
if (emitting) {
86+
missedProduced += n;
87+
return;
88+
}
89+
emitting = true;
90+
}
91+
92+
boolean skipFinal = false;
93+
try {
94+
long r = requested;
95+
long u = r - n;
96+
if (u < 0) {
97+
throw new IllegalStateException();
98+
}
99+
requested = u;
100+
101+
emitLoop();
102+
skipFinal = true;
103+
} finally {
104+
if (!skipFinal) {
105+
synchronized (this) {
106+
emitting = false;
107+
}
108+
}
109+
}
110+
}
111+
112+
public void setProducer(Producer newProducer) {
113+
synchronized (this) {
114+
if (emitting) {
115+
missedProducer = newProducer == null ? NULL_PRODUCER : newProducer;
116+
return;
117+
}
118+
emitting = true;
119+
}
120+
boolean skipFinal = false;
121+
try {
122+
currentProducer = newProducer;
123+
if (newProducer != null) {
124+
newProducer.request(requested);
125+
}
126+
127+
emitLoop();
128+
skipFinal = true;
129+
} finally {
130+
if (!skipFinal) {
131+
synchronized (this) {
132+
emitting = false;
133+
}
134+
}
135+
}
136+
}
137+
138+
public void emitLoop() {
139+
for (;;) {
140+
long localRequested;
141+
long localProduced;
142+
Producer localProducer;
143+
synchronized (this) {
144+
localRequested = missedRequested;
145+
localProduced = missedProduced;
146+
localProducer = missedProducer;
147+
if (localRequested == 0L
148+
&& localProduced == 0L
149+
&& localProducer == null) {
150+
emitting = false;
151+
return;
152+
}
153+
missedRequested = 0L;
154+
missedProduced = 0L;
155+
missedProducer = null;
156+
}
157+
158+
long r = requested;
159+
160+
if (r != Long.MAX_VALUE) {
161+
long u = r + localRequested;
162+
if (u < 0 || u == Long.MAX_VALUE) {
163+
r = Long.MAX_VALUE;
164+
requested = r;
165+
} else {
166+
long v = u - localProduced;
167+
if (v < 0) {
168+
throw new IllegalStateException("more produced than requested");
169+
}
170+
r = v;
171+
requested = v;
172+
}
173+
}
174+
if (localProducer != null) {
175+
if (localProducer == NULL_PRODUCER) {
176+
currentProducer = null;
177+
} else {
178+
currentProducer = localProducer;
179+
localProducer.request(r);
180+
}
181+
} else {
182+
Producer p = currentProducer;
183+
if (p != null && localRequested != 0L) {
184+
p.request(localRequested);
185+
}
186+
}
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)