Skip to content

Commit aeee037

Browse files
Merge pull request #2901 from akarnokd/ToSortedListBackpressure
Operators toList and toSortedList now support backpressure
2 parents d3d15b9 + 615db6a commit aeee037

File tree

6 files changed

+390
-77
lines changed

6 files changed

+390
-77
lines changed

src/main/java/rx/Observable.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8596,7 +8596,7 @@ public final BlockingObservable<T> toBlocking() {
85968596
* you do not have the option to unsubscribe.
85978597
* <dl>
85988598
* <dt><b>Backpressure Support:</b></dt>
8599-
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
8599+
* <dd>The operator buffers everything from its upstream but it only emits the aggregated list when the downstream requests at least one item.</dd>
86008600
* <dt><b>Scheduler:</b></dt>
86018601
* <dd>{@code toList} does not operate by default on a particular {@link Scheduler}.</dd>
86028602
* </dl>
@@ -8797,7 +8797,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
87978797
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
87988798
* <dl>
87998799
* <dt><b>Backpressure Support:</b></dt>
8800-
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
8800+
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
88018801
* <dt><b>Scheduler:</b></dt>
88028802
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
88038803
* </dl>
@@ -8810,7 +8810,7 @@ public final <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T
88108810
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
88118811
*/
88128812
public final Observable<List<T>> toSortedList() {
8813-
return lift(new OperatorToObservableSortedList<T>());
8813+
return lift(new OperatorToObservableSortedList<T>(10));
88148814
}
88158815

88168816
/**
@@ -8820,7 +8820,7 @@ public final Observable<List<T>> toSortedList() {
88208820
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
88218821
* <dl>
88228822
* <dt><b>Backpressure Support:</b></dt>
8823-
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
8823+
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
88248824
* <dt><b>Scheduler:</b></dt>
88258825
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
88268826
* </dl>
@@ -8833,7 +8833,60 @@ public final Observable<List<T>> toSortedList() {
88338833
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
88348834
*/
88358835
public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
8836-
return lift(new OperatorToObservableSortedList<T>(sortFunction));
8836+
return lift(new OperatorToObservableSortedList<T>(sortFunction, 10));
8837+
}
8838+
8839+
/**
8840+
* Returns an Observable that emits a list that contains the items emitted by the source Observable, in a
8841+
* sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all
8842+
* other items in the sequence.
8843+
* <p>
8844+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
8845+
* <dl>
8846+
* <dt><b>Backpressure Support:</b></dt>
8847+
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
8848+
* <dt><b>Scheduler:</b></dt>
8849+
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
8850+
* </dl>
8851+
*
8852+
* @throws ClassCastException
8853+
* if any item emitted by the Observable does not implement {@link Comparable} with respect to
8854+
* all other items emitted by the Observable
8855+
* @param initialCapacity
8856+
* the initial capacity of the ArrayList used to accumulate items before sorting
8857+
* @return an Observable that emits a list that contains the items emitted by the source Observable in
8858+
* sorted order
8859+
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
8860+
*/
8861+
@Experimental
8862+
public final Observable<List<T>> toSortedList(int initialCapacity) {
8863+
return lift(new OperatorToObservableSortedList<T>(initialCapacity));
8864+
}
8865+
8866+
/**
8867+
* Returns an Observable that emits a list that contains the items emitted by the source Observable, in a
8868+
* sorted order based on a specified comparison function.
8869+
* <p>
8870+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
8871+
* <dl>
8872+
* <dt><b>Backpressure Support:</b></dt>
8873+
* <dd>The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.</dd>
8874+
* <dt><b>Scheduler:</b></dt>
8875+
* <dd>{@code toSortedList} does not operate by default on a particular {@link Scheduler}.</dd>
8876+
* </dl>
8877+
*
8878+
* @param sortFunction
8879+
* a function that compares two items emitted by the source Observable and returns an Integer
8880+
* that indicates their sort order
8881+
* @param initialCapacity
8882+
* the initial capacity of the ArrayList used to accumulate items before sorting
8883+
* @return an Observable that emits a list that contains the items emitted by the source Observable in
8884+
* sorted order
8885+
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
8886+
*/
8887+
@Experimental
8888+
public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction, int initialCapacity) {
8889+
return lift(new OperatorToObservableSortedList<T>(sortFunction, initialCapacity));
88378890
}
88388891

88398892
/**

src/main/java/rx/internal/operators/OperatorToObservableList.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ public static <T> OperatorToObservableList<T> instance() {
5252
private OperatorToObservableList() { }
5353
@Override
5454
public Subscriber<? super T> call(final Subscriber<? super List<T>> o) {
55-
return new Subscriber<T>(o) {
55+
final SingleDelayedProducer<List<T>> producer = new SingleDelayedProducer<List<T>>(o);
56+
Subscriber<T> result = new Subscriber<T>() {
5657

57-
private boolean completed = false;
58-
final List<T> list = new LinkedList<T>();
58+
boolean completed = false;
59+
List<T> list = new LinkedList<T>();
5960

6061
@Override
6162
public void onStart() {
@@ -64,27 +65,32 @@ public void onStart() {
6465

6566
@Override
6667
public void onCompleted() {
67-
try {
68+
if (!completed) {
6869
completed = true;
69-
/*
70-
* Ideally this should just return Collections.unmodifiableList(list) and not copy it,
71-
* but, it ends up being a breaking change if we make that modification.
72-
*
73-
* Here is an example of is being done with these lists that breaks if we make it immutable:
74-
*
75-
* Caused by: java.lang.UnsupportedOperationException
76-
* at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244)
77-
* at java.util.Collections.sort(Collections.java:221)
78-
* ...
79-
* Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class
80-
* at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
81-
* at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
82-
* ... 419 more
83-
*/
84-
o.onNext(new ArrayList<T>(list));
85-
o.onCompleted();
86-
} catch (Throwable e) {
87-
onError(e);
70+
List<T> result;
71+
try {
72+
/*
73+
* Ideally this should just return Collections.unmodifiableList(list) and not copy it,
74+
* but, it ends up being a breaking change if we make that modification.
75+
*
76+
* Here is an example of is being done with these lists that breaks if we make it immutable:
77+
*
78+
* Caused by: java.lang.UnsupportedOperationException
79+
* at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244)
80+
* at java.util.Collections.sort(Collections.java:221)
81+
* ...
82+
* Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class
83+
* at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
84+
* at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
85+
* ... 419 more
86+
*/
87+
result = new ArrayList<T>(list);
88+
} catch (Throwable t) {
89+
onError(t);
90+
return;
91+
}
92+
list = null;
93+
producer.set(result);
8894
}
8995
}
9096

@@ -101,6 +107,9 @@ public void onNext(T value) {
101107
}
102108

103109
};
110+
o.add(result);
111+
o.setProducer(producer);
112+
return result;
104113
}
105114

106115
}

src/main/java/rx/internal/operators/OperatorToObservableSortedList.java

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collections;
20-
import java.util.Comparator;
21-
import java.util.List;
18+
import java.util.*;
2219

2320
import rx.Observable.Operator;
24-
import rx.Subscriber;
21+
import rx.*;
2522
import rx.functions.Func2;
2623

2724
/**
@@ -35,72 +32,82 @@
3532
* the type of the items emitted by the source and the resulting {@code Observable}s
3633
*/
3734
public final class OperatorToObservableSortedList<T> implements Operator<List<T>, T> {
38-
private final Func2<? super T, ? super T, Integer> sortFunction;
35+
private final Comparator<? super T> sortFunction;
36+
private final int initialCapacity;
3937

4038
@SuppressWarnings("unchecked")
41-
public OperatorToObservableSortedList() {
42-
this.sortFunction = defaultSortFunction;
39+
public OperatorToObservableSortedList(int initialCapacity) {
40+
this.sortFunction = DEFAULT_SORT_FUNCTION;
41+
this.initialCapacity = initialCapacity;
4342
}
4443

45-
public OperatorToObservableSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
46-
this.sortFunction = sortFunction;
44+
public OperatorToObservableSortedList(final Func2<? super T, ? super T, Integer> sortFunction, int initialCapacity) {
45+
this.initialCapacity = initialCapacity;
46+
this.sortFunction = new Comparator<T>() {
47+
@Override
48+
public int compare(T o1, T o2) {
49+
return sortFunction.call(o1, o2);
50+
}
51+
};
4752
}
4853

4954
@Override
50-
public Subscriber<? super T> call(final Subscriber<? super List<T>> o) {
51-
return new Subscriber<T>(o) {
52-
53-
final List<T> list = new ArrayList<T>();
55+
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
56+
final SingleDelayedProducer<List<T>> producer = new SingleDelayedProducer<List<T>>(child);
57+
Subscriber<T> result = new Subscriber<T>() {
5458

59+
List<T> list = new ArrayList<T>(initialCapacity);
60+
boolean completed;
61+
5562
@Override
5663
public void onStart() {
5764
request(Long.MAX_VALUE);
5865
}
5966

6067
@Override
6168
public void onCompleted() {
62-
try {
63-
64-
// sort the list before delivery
65-
Collections.sort(list, new Comparator<T>() {
66-
67-
@Override
68-
public int compare(T o1, T o2) {
69-
return sortFunction.call(o1, o2);
70-
}
71-
72-
});
73-
74-
o.onNext(Collections.unmodifiableList(list));
75-
o.onCompleted();
76-
} catch (Throwable e) {
77-
onError(e);
69+
if (!completed) {
70+
completed = true;
71+
List<T> a = list;
72+
list = null;
73+
try {
74+
// sort the list before delivery
75+
Collections.sort(a, sortFunction);
76+
} catch (Throwable e) {
77+
onError(e);
78+
return;
79+
}
80+
producer.set(a);
7881
}
7982
}
8083

8184
@Override
8285
public void onError(Throwable e) {
83-
o.onError(e);
86+
child.onError(e);
8487
}
8588

8689
@Override
8790
public void onNext(T value) {
88-
list.add(value);
91+
if (!completed) {
92+
list.add(value);
93+
}
8994
}
9095

9196
};
97+
child.add(result);
98+
child.setProducer(producer);
99+
return result;
92100
}
93-
94101
// raw because we want to support Object for this default
95102
@SuppressWarnings("rawtypes")
96-
private static Func2 defaultSortFunction = new DefaultComparableFunction();
103+
private static Comparator DEFAULT_SORT_FUNCTION = new DefaultComparableFunction();
97104

98-
private static class DefaultComparableFunction implements Func2<Object, Object, Integer> {
105+
private static class DefaultComparableFunction implements Comparator<Object> {
99106

100107
// unchecked because we want to support Object for this default
101108
@SuppressWarnings("unchecked")
102109
@Override
103-
public Integer call(Object t1, Object t2) {
110+
public int compare(Object t1, Object t2) {
104111
Comparable<Object> c1 = (Comparable<Object>) t1;
105112
Comparable<Object> c2 = (Comparable<Object>) t2;
106113
return c1.compareTo(c2);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package rx.internal.operators;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
import rx.*;
6+
7+
/**
8+
* A producer that holds a single value until it is requested and emits it followed by an onCompleted.
9+
*/
10+
public final class SingleDelayedProducer<T> extends AtomicInteger implements Producer {
11+
/** */
12+
private static final long serialVersionUID = 4721551710164477552L;
13+
/** The actual child. */
14+
final Subscriber<? super T> child;
15+
/** The value to emit, acquired and released by compareAndSet. */
16+
T value;
17+
/** State flag: request() called with positive value. */
18+
static final int REQUESTED = 1;
19+
/** State flag: set() called. */
20+
static final int SET = 2;
21+
/**
22+
* Constructs a SingleDelayedProducer with the given child as output.
23+
* @param child the subscriber to emit the value and completion events
24+
*/
25+
public SingleDelayedProducer(Subscriber<? super T> child) {
26+
this.child = child;
27+
}
28+
@Override
29+
public void request(long n) {
30+
if (n > 0) {
31+
for (;;) {
32+
int s = get();
33+
// if already requested
34+
if ((s & REQUESTED) != 0) {
35+
break;
36+
}
37+
int u = s | REQUESTED;
38+
if (compareAndSet(s, u)) {
39+
if ((s & SET) != 0) {
40+
emit();
41+
}
42+
break;
43+
}
44+
}
45+
}
46+
}
47+
/**
48+
* Sets the value to be emitted and emits it if there was a request.
49+
* Should be called only once and from a single thread
50+
* @param value the value to set and possibly emit
51+
*/
52+
public void set(T value) {
53+
for (;;) {
54+
int s = get();
55+
// if already set
56+
if ((s & SET) != 0) {
57+
break;
58+
}
59+
int u = s | SET;
60+
this.value = value;
61+
if (compareAndSet(s, u)) {
62+
if ((s & REQUESTED) != 0) {
63+
emit();
64+
}
65+
break;
66+
}
67+
}
68+
}
69+
/**
70+
* Emits the set value if the child is not unsubscribed and bounces back
71+
* exceptions caught from child.onNext.
72+
*/
73+
void emit() {
74+
try {
75+
T v = value;
76+
value = null; // do not hold onto the value
77+
if (child.isUnsubscribed()) {
78+
return;
79+
}
80+
child.onNext(v);
81+
} catch (Throwable t) {
82+
child.onError(t);
83+
return;
84+
}
85+
child.onCompleted();
86+
}
87+
}

0 commit comments

Comments
 (0)