Skip to content

Commit 03332a2

Browse files
Merge pull request #2963 from ReactiveX/StandardProducers
Set of standard producers and updated queue implementations with some
2 parents 2061d4f + ad0d422 commit 03332a2

18 files changed

+2153
-94
lines changed

src/main/java/rx/internal/operators/OperatorToObservableList.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable.Operator;
19-
import rx.Subscriber;
18+
import java.util.*;
2019

21-
import java.util.ArrayList;
22-
import java.util.LinkedList;
23-
import java.util.List;
20+
import rx.Observable.Operator;
21+
import rx.*;
22+
import rx.internal.producers.SingleDelayedProducer;
2423

2524
/**
2625
* Returns an {@code Observable} that emits a single item, a list composed of all the items emitted by the
@@ -90,7 +89,7 @@ public void onCompleted() {
9089
return;
9190
}
9291
list = null;
93-
producer.set(result);
92+
producer.setValue(result);
9493
}
9594
}
9695

src/main/java/rx/internal/operators/OperatorToObservableSortedList.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable.Operator;
2121
import rx.*;
2222
import rx.functions.Func2;
23+
import rx.internal.producers.SingleDelayedProducer;
2324

2425
/**
2526
* Return an {@code Observable} that emits the items emitted by the source {@code Observable}, in a sorted order
@@ -77,7 +78,7 @@ public void onCompleted() {
7778
onError(e);
7879
return;
7980
}
80-
producer.set(a);
81+
producer.setValue(a);
8182
}
8283
}
8384

src/main/java/rx/internal/operators/SingleDelayedProducer.java

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.producers;
17+
18+
import rx.*;
19+
20+
/**
21+
* Producer that allows changing an underlying producer atomically and correctly resume with the accumulated
22+
* requests.
23+
*/
24+
public final class ProducerArbiter implements Producer {
25+
long requested;
26+
Producer currentProducer;
27+
28+
boolean emitting;
29+
long missedRequested;
30+
long missedProduced;
31+
Producer missedProducer;
32+
33+
static final Producer NULL_PRODUCER = new Producer() {
34+
@Override
35+
public void request(long n) {
36+
37+
}
38+
};
39+
40+
@Override
41+
public void request(long n) {
42+
if (n < 0) {
43+
throw new IllegalArgumentException("n >= 0 required");
44+
}
45+
if (n == 0) {
46+
return;
47+
}
48+
synchronized (this) {
49+
if (emitting) {
50+
missedRequested += n;
51+
return;
52+
}
53+
emitting = true;
54+
}
55+
boolean skipFinal = false;
56+
try {
57+
long r = requested;
58+
long u = r + n;
59+
if (u < 0) {
60+
u = Long.MAX_VALUE;
61+
}
62+
requested = u;
63+
64+
Producer p = currentProducer;
65+
if (p != null) {
66+
p.request(n);
67+
}
68+
69+
emitLoop();
70+
skipFinal = true;
71+
} finally {
72+
if (!skipFinal) {
73+
synchronized (this) {
74+
emitting = false;
75+
}
76+
}
77+
}
78+
}
79+
80+
public void produced(long n) {
81+
if (n <= 0) {
82+
throw new IllegalArgumentException("n > 0 required");
83+
}
84+
synchronized (this) {
85+
if (emitting) {
86+
missedProduced += n;
87+
return;
88+
}
89+
emitting = true;
90+
}
91+
92+
boolean skipFinal = false;
93+
try {
94+
long r = requested;
95+
if (r != Long.MAX_VALUE) {
96+
long u = r - n;
97+
if (u < 0) {
98+
throw new IllegalStateException();
99+
}
100+
requested = u;
101+
}
102+
103+
emitLoop();
104+
skipFinal = true;
105+
} finally {
106+
if (!skipFinal) {
107+
synchronized (this) {
108+
emitting = false;
109+
}
110+
}
111+
}
112+
}
113+
114+
public void setProducer(Producer newProducer) {
115+
synchronized (this) {
116+
if (emitting) {
117+
missedProducer = newProducer == null ? NULL_PRODUCER : newProducer;
118+
return;
119+
}
120+
emitting = true;
121+
}
122+
boolean skipFinal = false;
123+
try {
124+
currentProducer = newProducer;
125+
if (newProducer != null) {
126+
newProducer.request(requested);
127+
}
128+
129+
emitLoop();
130+
skipFinal = true;
131+
} finally {
132+
if (!skipFinal) {
133+
synchronized (this) {
134+
emitting = false;
135+
}
136+
}
137+
}
138+
}
139+
140+
public void emitLoop() {
141+
for (;;) {
142+
long localRequested;
143+
long localProduced;
144+
Producer localProducer;
145+
synchronized (this) {
146+
localRequested = missedRequested;
147+
localProduced = missedProduced;
148+
localProducer = missedProducer;
149+
if (localRequested == 0L
150+
&& localProduced == 0L
151+
&& localProducer == null) {
152+
emitting = false;
153+
return;
154+
}
155+
missedRequested = 0L;
156+
missedProduced = 0L;
157+
missedProducer = null;
158+
}
159+
160+
long r = requested;
161+
162+
if (r != Long.MAX_VALUE) {
163+
long u = r + localRequested;
164+
if (u < 0 || u == Long.MAX_VALUE) {
165+
r = Long.MAX_VALUE;
166+
requested = r;
167+
} else {
168+
long v = u - localProduced;
169+
if (v < 0) {
170+
throw new IllegalStateException("more produced than requested");
171+
}
172+
r = v;
173+
requested = v;
174+
}
175+
}
176+
if (localProducer != null) {
177+
if (localProducer == NULL_PRODUCER) {
178+
currentProducer = null;
179+
} else {
180+
currentProducer = localProducer;
181+
localProducer.request(r);
182+
}
183+
} else {
184+
Producer p = currentProducer;
185+
if (p != null && localRequested != 0L) {
186+
p.request(localRequested);
187+
}
188+
}
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)