Skip to content

Commit 162c506

Browse files
committed
Merge pull request #51 from ReactiveSocket/fromIterable
PublisherUtils.fromIterable
2 parents 6477513 + 0a74aa5 commit 162c506

File tree

6 files changed

+349
-24
lines changed

6 files changed

+349
-24
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivesocket.internal;
14+
15+
import java.util.concurrent.atomic.*;
16+
17+
/**
18+
* Utility class to help with backpressure-related operations such as request aggregation.
19+
*/
20+
public enum BackpressureHelper {
21+
;
22+
/**
23+
* Adds two long values and caps the sum at Long.MAX_VALUE.
24+
* @param a the first value
25+
* @param b the second value
26+
* @return the sum capped at Long.MAX_VALUE
27+
*/
28+
public static long addCap(long a, long b) {
29+
long u = a + b;
30+
if (u < 0L) {
31+
return Long.MAX_VALUE;
32+
}
33+
return u;
34+
}
35+
36+
/**
37+
* Multiplies two long values and caps the product at Long.MAX_VALUE.
38+
* @param a the first value
39+
* @param b the second value
40+
* @return the product capped at Long.MAX_VALUE
41+
*/
42+
public static long multiplyCap(long a, long b) {
43+
long u = a * b;
44+
if (((a | b) >>> 31) != 0) {
45+
if (u / a != b) {
46+
return Long.MAX_VALUE;
47+
}
48+
}
49+
return u;
50+
}
51+
52+
/**
53+
* Atomically adds the positive value n to the requested value in the AtomicLong and
54+
* caps the result at Long.MAX_VALUE and returns the previous value.
55+
* @param requested the AtomicLong holding the current requested value
56+
* @param n the value to add, must be positive (not verified)
57+
* @return the original value before the add
58+
*/
59+
public static long add(AtomicLong requested, long n) {
60+
for (;;) {
61+
long r = requested.get();
62+
if (r == Long.MAX_VALUE) {
63+
return Long.MAX_VALUE;
64+
}
65+
long u = addCap(r, n);
66+
if (requested.compareAndSet(r, u)) {
67+
return r;
68+
}
69+
}
70+
}
71+
72+
/**
73+
* Atomically adds the positive value n to the value in the instance through the field updater and
74+
* caps the result at Long.MAX_VALUE and returns the previous value.
75+
* @param updater the field updater for the requested value
76+
* @param instance the instance holding the requested value
77+
* @param n the value to add, must be positive (not verified)
78+
* @return the original value before the add
79+
*/
80+
public static <T> long add(AtomicLongFieldUpdater<T> updater, T instance, long n) {
81+
for (;;) {
82+
long r = updater.get(instance);
83+
if (r == Long.MAX_VALUE) {
84+
return Long.MAX_VALUE;
85+
}
86+
long u = addCap(r, n);
87+
if (updater.compareAndSet(instance, r, u)) {
88+
return r;
89+
}
90+
}
91+
}
92+
}
Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,67 @@
11
/**
22
* Copyright 2015 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
87
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
1512
*/
13+
1614
package io.reactivesocket.internal;
1715

18-
import org.reactivestreams.Subscription;
16+
import org.reactivestreams.*;
1917

20-
public class EmptySubscription implements Subscription
21-
{
22-
public static EmptySubscription EMPTY = new EmptySubscription();
23-
24-
public void request(long n)
25-
{
18+
/**
19+
* An empty subscription that does nothing other than validates the request amount.
20+
*/
21+
public enum EmptySubscription implements Subscription {
22+
/** A singleton, stateless instance. */
23+
INSTANCE;
24+
25+
@Override
26+
public void request(long n) {
27+
SubscriptionHelper.validateRequest(n);
28+
}
29+
@Override
30+
public void cancel() {
31+
// no-op
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "EmptySubscription";
37+
}
38+
39+
/**
40+
* Sets the empty subscription instance on the subscriber and then
41+
* calls onError with the supplied error.
42+
*
43+
* <p>Make sure this is only called if the subscriber hasn't received a
44+
* subscription already (there is no way of telling this).
45+
*
46+
* @param e the error to deliver to the subscriber
47+
* @param s the target subscriber
48+
*/
49+
public static void error(Throwable e, Subscriber<?> s) {
50+
s.onSubscribe(INSTANCE);
51+
s.onError(e);
2652
}
2753

28-
public void cancel()
29-
{
54+
/**
55+
* Sets the empty subscription instance on the subscriber and then
56+
* calls onComplete.
57+
*
58+
* <p>Make sure this is only called if the subscriber hasn't received a
59+
* subscription already (there is no way of telling this).
60+
*
61+
* @param s the target subscriber
62+
*/
63+
public static void complete(Subscriber<?> s) {
64+
s.onSubscribe(INSTANCE);
65+
s.onComplete();
3066
}
3167
}

src/main/java/io/reactivesocket/internal/PublisherUtils.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.reactivesocket.internal;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.Iterator;
1920
import java.util.concurrent.*;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.atomic.AtomicLong;
@@ -202,5 +203,122 @@ public void cancel()
202203
});
203204
};
204205
}
206+
207+
public static final Publisher<Frame> fromIterable(Iterable<Frame> is) {
208+
return new PublisherIterableSource<Frame>(is);
209+
}
210+
211+
public static final class PublisherIterableSource<T> extends AtomicBoolean implements Publisher<T> {
212+
/** */
213+
private static final long serialVersionUID = 9051303031779816842L;
214+
215+
final Iterable<? extends T> source;
216+
public PublisherIterableSource(Iterable<? extends T> source) {
217+
this.source = source;
218+
}
219+
220+
@Override
221+
public void subscribe(Subscriber<? super T> s) {
222+
Iterator<? extends T> it;
223+
try {
224+
it = source.iterator();
225+
} catch (Throwable e) {
226+
EmptySubscription.error(e, s);
227+
return;
228+
}
229+
boolean hasNext;
230+
try {
231+
hasNext = it.hasNext();
232+
} catch (Throwable e) {
233+
EmptySubscription.error(e, s);
234+
return;
235+
}
236+
if (!hasNext) {
237+
EmptySubscription.complete(s);
238+
return;
239+
}
240+
s.onSubscribe(new IteratorSourceSubscription<>(it, s));
241+
}
242+
243+
static final class IteratorSourceSubscription<T> extends AtomicLong implements Subscription {
244+
/** */
245+
private static final long serialVersionUID = 8931425802102883003L;
246+
final Iterator<? extends T> it;
247+
final Subscriber<? super T> subscriber;
248+
249+
volatile boolean cancelled;
250+
251+
public IteratorSourceSubscription(Iterator<? extends T> it, Subscriber<? super T> subscriber) {
252+
this.it = it;
253+
this.subscriber = subscriber;
254+
}
255+
@Override
256+
public void request(long n) {
257+
if (SubscriptionHelper.validateRequest(n)) {
258+
return;
259+
}
260+
if (BackpressureHelper.add(this, n) != 0L) {
261+
return;
262+
}
263+
long r = n;
264+
long r0 = n;
265+
final Subscriber<? super T> subscriber = this.subscriber;
266+
final Iterator<? extends T> it = this.it;
267+
for (;;) {
268+
if (cancelled) {
269+
return;
270+
}
271+
272+
long e = 0L;
273+
while (r != 0L) {
274+
T v;
275+
try {
276+
v = it.next();
277+
} catch (Throwable ex) {
278+
subscriber.onError(ex);
279+
return;
280+
}
281+
282+
if (v == null) {
283+
subscriber.onError(new NullPointerException("Iterator returned a null element"));
284+
return;
285+
}
286+
287+
subscriber.onNext(v);
288+
289+
if (cancelled) {
290+
return;
291+
}
292+
293+
boolean hasNext;
294+
try {
295+
hasNext = it.hasNext();
296+
} catch (Throwable ex) {
297+
subscriber.onError(ex);
298+
return;
299+
}
300+
if (!hasNext) {
301+
subscriber.onComplete();
302+
return;
303+
}
304+
305+
r--;
306+
e--;
307+
}
308+
if (e != 0L && r0 != Long.MAX_VALUE) {
309+
r = addAndGet(e);
310+
}
311+
if (r == 0L) {
312+
break;
313+
}
314+
}
315+
}
316+
@Override
317+
public void cancel() {
318+
cancelled = true;
319+
}
320+
}
321+
}
322+
205323

206324
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ public void cancel() {
542542
}
543543
streamInputSubscriber.parentSubscription.cancel();
544544
if (payloadsSubscription != null) {
545-
if (!payloadsSubscription.compareAndSet(null, EmptySubscription.EMPTY)) {
545+
if (!payloadsSubscription.compareAndSet(null, EmptySubscription.INSTANCE)) {
546546
payloadsSubscription.get().cancel(); // unsubscribe it if it already exists
547547
}
548548
}

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ public void onComplete() {
378378

379379
@Override
380380
public void cancel() {
381-
if (!parent.compareAndSet(null, EmptySubscription.EMPTY)) {
381+
if (!parent.compareAndSet(null, EmptySubscription.INSTANCE)) {
382382
parent.get().cancel();
383383
cleanup();
384384
}
@@ -498,7 +498,7 @@ public void onComplete() {
498498

499499
@Override
500500
public void cancel() {
501-
if (!parent.compareAndSet(null, EmptySubscription.EMPTY)) {
501+
if (!parent.compareAndSet(null, EmptySubscription.INSTANCE)) {
502502
parent.get().cancel();
503503
cleanup();
504504
}
@@ -643,7 +643,7 @@ public void onComplete() {
643643

644644
@Override
645645
public void cancel() {
646-
if (!parent.compareAndSet(null, EmptySubscription.EMPTY)) {
646+
if (!parent.compareAndSet(null, EmptySubscription.INSTANCE)) {
647647
parent.get().cancel();
648648
cleanup();
649649
}

0 commit comments

Comments
 (0)