Skip to content

Commit b77aa1e

Browse files
authored
3.x: onReduceBackpressure internals cleanup (#7151)
* 3.x: onReduceBackpressure internals cleanup * Update FlowableOnBackpressureReduceTest.java * Update FlowableOnBackpressureReduceWithTest.java
1 parent e3a0302 commit b77aa1e

File tree

4 files changed

+59
-11
lines changed

4 files changed

+59
-11
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduce.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,20 @@ static final class BackpressureReduceSubscriber<T> extends AbstractBackpressureT
4949
@Override
5050
public void onNext(T t) {
5151
T v = current.get();
52+
if (v != null) {
53+
v = current.getAndSet(null);
54+
}
5255
if (v == null) {
5356
current.lazySet(t);
54-
} else if ((v = current.getAndSet(null)) != null) {
57+
} else {
5558
try {
5659
current.lazySet(Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"));
5760
} catch (Throwable ex) {
5861
Exceptions.throwIfFatal(ex);
62+
upstream.cancel();
5963
onError(ex);
60-
cancel();
6164
return;
6265
}
63-
} else {
64-
current.lazySet(t);
6566
}
6667
drain();
6768
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceWith.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,23 @@ static final class BackpressureReduceWithSubscriber<T, R> extends AbstractBackpr
5858
@Override
5959
public void onNext(T t) {
6060
R v = current.get();
61+
if (v != null) {
62+
v = current.getAndSet(null);
63+
}
6164
try {
6265
if (v == null) {
6366
current.lazySet(Objects.requireNonNull(
6467
reducer.apply(Objects.requireNonNull(supplier.get(), "The supplier returned a null value"), t),
6568
"The reducer returned a null value"
6669
));
67-
} else if ((v = current.getAndSet(null)) != null) {
68-
current.lazySet(Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"));
6970
} else {
70-
current.lazySet(Objects.requireNonNull(
71-
reducer.apply(Objects.requireNonNull(supplier.get(), "The supplier returned a null value"), t),
72-
"The reducer returned a null value"
73-
));
71+
current.lazySet(Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"));
7472
}
7573
} catch (Throwable ex) {
7674
Exceptions.throwIfFatal(ex);
75+
upstream.cancel();
7776
onError(ex);
78-
cancel();
77+
return;
7978
}
8079
drain();
8180
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,30 @@ public void synchronousDrop() {
110110
ts.assertTerminated();
111111
}
112112

113+
@Test
114+
public void reduceBackpressuredSync() {
115+
PublishProcessor<Integer> source = PublishProcessor.create();
116+
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);
117+
118+
source.onBackpressureReduce(Integer::sum).subscribe(ts);
119+
120+
source.onNext(1);
121+
source.onNext(2);
122+
source.onNext(3);
123+
124+
ts.request(1);
125+
126+
ts.assertValuesOnly(6);
127+
128+
source.onNext(4);
129+
source.onComplete();
130+
131+
ts.assertValuesOnly(6);
132+
133+
ts.request(1);
134+
ts.assertResult(6, 4);
135+
}
136+
113137
private <T> TestSubscriberEx<T> createDelayedSubscriber() {
114138
return new TestSubscriberEx<T>(1L) {
115139
final Random rnd = new Random();

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceWithTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,30 @@ public void simpleBackpressure() {
9090
ts.assertNotComplete();
9191
}
9292

93+
@Test
94+
public void reduceBackpressuredSync() {
95+
PublishProcessor<Integer> source = PublishProcessor.create();
96+
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);
97+
98+
source.onBackpressureReduce(() -> 0, Integer::sum).subscribe(ts);
99+
100+
source.onNext(1);
101+
source.onNext(2);
102+
source.onNext(3);
103+
104+
ts.request(1);
105+
106+
ts.assertValuesOnly(6);
107+
108+
source.onNext(4);
109+
source.onComplete();
110+
111+
ts.assertValuesOnly(6);
112+
113+
ts.request(1);
114+
ts.assertResult(6, 4);
115+
}
116+
93117
@Test
94118
public void synchronousDrop() {
95119
PublishProcessor<Integer> source = PublishProcessor.create();

0 commit comments

Comments
 (0)