-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Fix FlowableOnBackpressureBufferStrategy #4441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ureBufferStrategy Fix drop strategy logic in FlowableOnBackpressureBufferStrategy Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage
|
||
public class FlowableOnBackpressureBufferStrategyTest { | ||
|
||
private static Action onOverFlow = new Action() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Functions.EMPTY_ACTION
.
Current coverage is 75.32% (diff: 66.66%)@@ 2.x #4441 diff @@
==========================================
Files 454 454
Lines 32271 32274 +3
Methods 0 0
Messages 0 0
Branches 5208 5209 +1
==========================================
+ Hits 24249 24309 +60
+ Misses 6002 5928 -74
- Partials 2020 2037 +17
|
@@ -123,6 +123,7 @@ public void onNext(T t) { | |||
} | |||
} else { | |||
dq.offer(t); | |||
drain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not call drain while in the synchronized block. It should look like this:
if (done) {
return;
}
boolean callOnOverflow = false;
boolean callError = false;
Deque<T> dq = deque;
synchronized (dq) {
if (dq.size() == bufferSize) {
switch (strategy) {
case DROP_LATEST:
dq.pollLast();
dq.offer(t);
callOnOverflow = true;
break;
case DROP_OLDEST:
dq.poll();
dq.offer(t);
callOnOverflow = true;
break;
default:
// signal error
callError = true;
break;
}
} else {
dq.offer(t);
return;
}
}
if (callOnOverflow) {
if (onOverflow != null) {
try {
onOverflow.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}
else if (callError) {
s.cancel();
onError(new MissingBackpressureException());
} else {
drain();
}
Move the call to drain out of the synchronized block Cleaned up unit tests to follow recommendations from akarnokd
Thanks! 👍 |
Fix buffered objects not propagated downstream in FlowableOnBackpressureBufferStrategy
Fix drop strategy logic in FlowableOnBackpressureBufferStrategy
Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage