Skip to content

Commit 9fddbd9

Browse files
committed
Merge pull request #3150 from akarnokd/BufferUntilSubscriberBackpressureV2
Window operators now support backpressure in the inner observable.
2 parents cdb7453 + e30a333 commit 9fddbd9

11 files changed

+798
-22
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void replaceSubject() {
154154
child.onNext(producer);
155155
}
156156
void createNewWindow() {
157-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
157+
UnicastSubject<T> bus = UnicastSubject.create();
158158
consumer = bus;
159159
producer = bus;
160160
}

src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void replaceSubject() {
160160
child.onNext(producer);
161161
}
162162
void createNewWindow() {
163-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
163+
UnicastSubject<T> bus = UnicastSubject.create();
164164
consumer = bus;
165165
producer = bus;
166166
Observable<? extends U> other;

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
6060
final class ExactSubscriber extends Subscriber<T> {
6161
final Subscriber<? super Observable<T>> child;
6262
int count;
63-
BufferUntilSubscriber<T> window;
63+
UnicastSubject<T> window;
6464
volatile boolean noWindow = true;
6565
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6666
/**
@@ -107,7 +107,7 @@ void requestMore(long n) {
107107
public void onNext(T t) {
108108
if (window == null) {
109109
noWindow = false;
110-
window = BufferUntilSubscriber.create();
110+
window = UnicastSubject.create();
111111
child.onNext(window);
112112
}
113113
window.onNext(t);
@@ -241,7 +241,7 @@ public void onCompleted() {
241241
}
242242

243243
CountedSubject<T> createCountedSubject() {
244-
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
244+
final UnicastSubject<T> bus = UnicastSubject.create();
245245
return new CountedSubject<T>(bus, bus);
246246
}
247247
}

src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ void endWindow(SerializedSubject<T> window) {
233233
}
234234
}
235235
SerializedSubject<T> createSerializedSubject() {
236-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
236+
UnicastSubject<T> bus = UnicastSubject.create();
237237
return new SerializedSubject<T>(bus, bus);
238238
}
239239
}

src/main/java/rx/internal/operators/OperatorWindowWithTime.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ boolean replaceSubject() {
214214
unsubscribe();
215215
return false;
216216
}
217-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
217+
UnicastSubject<T> bus = UnicastSubject.create();
218218
state = state.create(bus, bus);
219219
child.onNext(bus);
220220
return true;
@@ -492,7 +492,7 @@ void terminateChunk(CountedSerializedSubject<T> chunk) {
492492
}
493493
}
494494
CountedSerializedSubject<T> createCountedSerializedSubject() {
495-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
495+
UnicastSubject<T> bus = UnicastSubject.create();
496496
return new CountedSerializedSubject<T>(bus, bus);
497497
}
498498
}

0 commit comments

Comments
 (0)