Skip to content

Fix FlowableOnBackpressureBufferStrategy #4441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 31, 2016
Merged

Fix FlowableOnBackpressureBufferStrategy #4441

merged 2 commits into from
Aug 31, 2016

Conversation

matgabriel
Copy link
Contributor

Fix buffered objects not propagated downstream in FlowableOnBackpressureBufferStrategy
Fix drop strategy logic in FlowableOnBackpressureBufferStrategy
Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage

…ureBufferStrategy

Fix drop strategy logic in FlowableOnBackpressureBufferStrategy
Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage

public class FlowableOnBackpressureBufferStrategyTest {

private static Action onOverFlow = new Action() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Functions.EMPTY_ACTION.

@codecov-io
Copy link

codecov-io commented Aug 30, 2016

Current coverage is 75.32% (diff: 66.66%)

Merging #4441 into 2.x will increase coverage by 0.17%

@@                2.x      #4441   diff @@
==========================================
  Files           454        454          
  Lines         32271      32274     +3   
  Methods           0          0          
  Messages          0          0          
  Branches       5208       5209     +1   
==========================================
+ Hits          24249      24309    +60   
+ Misses         6002       5928    -74   
- Partials       2020       2037    +17   

Powered by Codecov. Last update a856572...1617087

@@ -123,6 +123,7 @@ public void onNext(T t) {
}
} else {
dq.offer(t);
drain();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not call drain while in the synchronized block. It should look like this:

            if (done) {
                return;
            }
            boolean callOnOverflow = false;
            boolean callError = false;
            Deque<T> dq = deque;
            synchronized (dq) {
               if (dq.size() == bufferSize) {
                   switch (strategy) {
                   case DROP_LATEST:
                       dq.pollLast();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   case DROP_OLDEST:
                       dq.poll();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   default:
                       // signal error
                       callError = true;
                       break;
                   }
               } else {
                   dq.offer(t);
                   return;
               }
            }

            if (callOnOverflow) {
                if (onOverflow != null) {
                    try {
                        onOverflow.run();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.cancel();
                        onError(ex);
                    }
                }
            } 
            else if (callError) {
                s.cancel();
                onError(new MissingBackpressureException());
            } else {
                drain();
            }

@akarnokd akarnokd added the Bug label Aug 30, 2016
@akarnokd akarnokd added this to the 2.0 RC 2 milestone Aug 30, 2016
Move the call to drain out of the synchronized block
Cleaned up unit tests to follow recommendations from akarnokd
@akarnokd
Copy link
Member

Thanks! 👍

@akarnokd akarnokd merged commit 81badc3 into ReactiveX:2.x Aug 31, 2016
@matgabriel matgabriel deleted the 2.x branch September 15, 2016 16:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants