Skip to content

RxRingBuffer with Inline Release #2189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

benjchristensen
Copy link
Member

Since previous tests have shown that pooling does still make a big difference, I'm trying to find a way to do pooling without the concurrency issue. I've tried with work-in-progress counters, ReadWrite locks and other variants and all of them impact performance far too much.

This one makes different trade-offs:

It will release automatically when a ...

  • terminal event is emitted
  • unsubscribe/release occurs and queue is empty
  • unsubscribe/release has occurred and poll/peek occurs

This can result in a queue not being put back in the pool if an early unsubscribe occurs from a different thread and causes the poll/peek to not trigger the release. In this case GC will reclaim the pool so it still functions, it just misses the pooling optimization.

There is a commented out test of using finalize(). It works as a safety net for the edge case, but at the cost of increased GC time.

Here are performance numbers of the various tests, this one being "Inline Volatile":

Benchmark                                          (size)   Mode   Samples          1.x   Inline Volatile    + finalize       RW Lock          WIP
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4757888.048       5264594.520   4956256.323   5288310.755  5032942.628
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    44877.618         42845.758     39209.439     25742.696    29025.955
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       42.366            40.979        37.036        24.769       27.260
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99981.127         99910.070     94307.080    103112.286   103176.289
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.675             4.620         4.670         4.374        4.313
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4751265.583       4706441.452   4376983.062   4739418.557  4673633.614
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458704.984        480075.261    427165.143    483313.588   476318.407
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    42493.290         42178.254     39640.240     42728.480    42542.171
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5406448.872       5375090.752   5066264.570   5628401.294  4974892.417
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.528            40.990        41.106        24.974       28.212
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76898.222         72655.377     69748.305     78283.565    78987.646
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3172.653          2955.854      3064.749      1858.361     2204.948
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5157134.576       5163837.644   4846336.744   5290961.536  5139893.848
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39961.491         39341.526     37312.117     40418.492    39163.267
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.925            35.730        33.948        35.526       35.611

The memory allocation amounts look good, though I don't understand the shape of the red graph at the top. That is making me question this.

screen shot 2014-12-22 at 9 05 06 pm

With finalize() (as an experiment, aware of all the reasons to not use finalize() from Effective Java, 2nd Edition) this is what it looks like:

screen shot 2014-12-22 at 9 05 56 pm

The GC times on this one are higher than without finalize():

screen shot 2014-12-22 at 9 06 33 pm

compared with:

screen shot 2014-12-22 at 9 07 10 pm

@daschl
Copy link
Contributor

daschl commented Dec 23, 2014

@benjchristensen interesting! Btw if you want to understand the different shapes you can go there and "mark the range" so you only see allocations in that time area. You can then do it for the other area as well and maybe the difference in allocations gives you a hint when and where things are happening.

And maybe you know that anyways and I'm just trying to be helpful ;)

@benjchristensen
Copy link
Member Author

Thanks @daschl I'll use that to dig in.

@akarnokd Is there any reason to not pursue this path? Are the trade-offs appropriate? And are my changes to publish safe? (I made them while holding my baby so not 100% confident with them yet!)

@akarnokd
Copy link
Member

I see a few troubling points:

  • read queue into a local variable first because it can turn into null between uses
  • catch (NullPointerException ) really?
  • publish.connect is not atomic and let's concurrent callers connect to the source multiple times
  • spmc is unnecessary
  • drainqueue if isComplete(o) same code on both then and else
  • drainqueue WIP.set(1) I'm not convinced this doesn't end up quitting the emission loop prematurely and causing hangs due to undelivered but expected events; I have to think about it some more.

@benjchristensen
Copy link
Member Author

read queue into a local variable first because it can turn into null between uses

Doing this is problematic and exactly what we need to avoid. If we retain a reference to it and continue working with it after it is released then it can be concurrently used by 2 RxRingBuffer instances because the pool will be able to hand it out to someone else.

publish.connect is not atomic and let's concurrent callers connect to the source multiple times

If so then it's been like that for a while and it can be fixed independent of this PR.

spmc is unnecessary

We need to get the SPSC queue fixed. It has had issues, hence the reason why it is still on SPMC. This has not changed.

drainqueue if isComplete(o) same code on both then and else

I don't understand what you're referring to.

I have to think about it some more.

Please do. That code has not been changed in this PR.

catch (NullPointerException ) really?

See the first point above. This is a matter of trade-offs. If we ensure we don't get a NullPointerException we then end up synchronizing everything and kill the performance. We are optimizing here for the fact that almost all the time we will terminate serially, or unsubscribe serially. It is very rare we will receive a concurrent unsubscribe. In those cases we can not hold a reference to queue. An if(queue != null) check is insufficient to protect against this, but using a mutex, WIP, RW lock etc all pay too high a cost for an edge case like this. In the occasional time we run into it, we catch the NPE and move on.

If there is a functional problem with it then let's look at that. If it's just a dislike for the pattern, then please propose an alternative that achieves the performance of this.

As an aside, please do not use sarcasm in code reviews. The "really?" comment is unhelpful and condescending. I am very much aware of the ugliness of this approach. If elegance had succeeded in the many attempts we've made thus far we wouldn't still be discussing this issue.

Are the trade-offs appropriate

Back to the trade offs that are driving this.

  • All attempts at removing the object pool have resulted in performance degradation via significant increases in object allocation and GC time.
  • The current approach to removing the object pool occasionally results in concurrency issues because concurrent emission/unsubscribe can result in 2 instances of RxRingBuffer holding a single queue.
  • Use of WIP, mutex, RW lock etc make it thread-safe but at significant performance penalty in the normal happy-path
  • Attempts with using WeakReference have failed so far. Every attempt I've made suggests that ReferenceQueue has non-trivial overhead and unfortunately it's the only way to get the "callback" via polling that something is released. It doesn't help much that it was written in Java 1.2 and is heavily synchronized.
  • Most use cases have serial emission/unsubscribe via terminal emission or unsubscribe on the same thread. For example, take will unsubscribe on the same thread serially. However, if take is on another thread after observeOn then it could be concurrent and that will be one of the edge cases we have to deal with.

Considering all of this, what is your recommended approach that is better than the option proposed in this pull request?

@akarnokd
Copy link
Member

I apologize for the unprofessional comment.

OperatorPublish L360: the if statement has identical sub-blocks, perhaps a break is missing?

The compiler will likely lift the member queue access into register so the queue value will be accessible during the run of the method.

A possible solution is to use a WriterReaderPhaser to mutually exclude the enqueue and unsubscribe calls:

Volatile long ingress;
Volatile long egress;
Volatile Queue queue;
Static final AtomicReferenceFieldUpdater QUEUE;

In onXXX methods:
if (queue == NOP_QUEUE) throw ...;
ingress++; // single reader and writer, no need for getAndIncrement
queue.offer(value);
egress++; // might get away with lazySet on the counters.

In unsubscribe:
Queue q = QUEUE.getAndSet(NOP_QUEUE); // flip
if (q == NOP_QUEUE) return; // idempotence
while (ingress != egress) Thread.yield(); // wait for the onXXX to finish
q.clear();
pool.putBack(q);

This adds two uncontended volatile writes to each onNext costing few dozen cycles. I cant test this right now, but the lazy version might cost only a few cycles. I'd consider both as reasonable tradeoffs for correctness.

@benjchristensen
Copy link
Member Author

That's an interesting approach. I look forward to seeing that impl and perf.

Whichever route we take for RxRingBuffer we will also need to fix the Publish issues. Should I open a separate PR to start that or have you already started on a better publish fix?=

@akarnokd
Copy link
Member

Those issues aren't really related to RxRingBuffer in publish so it can be an independent PR.

@benjchristensen benjchristensen force-pushed the experimental-ringbuffer-terminal-release branch 2 times, most recently from d8f4704 to f44311f Compare December 24, 2014 05:03
Can release when ...

- terminal event emitted
- unsubscribe/release occurs and queue is empty
- unsubscribe/release has occurred and poll/peek occurs

Can result in a queue not being put back in the pool if an early unsubscribe occurs from a different thread and causes the poll/peek to not trigger the release. In this case GC will reclaim the pool so it still functions, it just misses the pooling optimization.

There is a commented out test of using finalize(). It works as a safety net for the edge case, but at the cost of increased GC time.
@benjchristensen benjchristensen force-pushed the experimental-ringbuffer-terminal-release branch from f44311f to 658ab3a Compare December 24, 2014 05:31
@benjchristensen
Copy link
Member Author

I merged in the changes to publish so that it is not a factor in changes to RxRingBuffer.

We can open a separate PR/issue for improving publish.

@benjchristensen
Copy link
Member Author

This code is somehow getting non-determistic failures such as these:

rx.internal.operators.OperatorRetryTest > testRetryWithBackpressure FAILED
    java.lang.Exception: test timed out after 10000 milliseconds
rx.internal.operators.OnSubscribeCombineLatestTest > testWithCombineLatestIssue1717 FAILED
    java.lang.AssertionError: expected:<2000> but was:<2002>
        at org.junit.Assert.fail(Assert.java:93)
        at org.junit.Assert.failNotEquals(Assert.java:647)
        at org.junit.Assert.assertEquals(Assert.java:128)
        at org.junit.Assert.assertEquals(Assert.java:472)
        at org.junit.Assert.assertEquals(Assert.java:456)
        at rx.internal.operators.OnSubscribeCombineLatestTest.testWithCombineLatestIssue1717(OnSubscribeCombineLatestTest.java:845)

@akarnokd
Copy link
Member

Quick update on the algorithm: it doesn't work in this form. The ingress counter has to convey the flip info, not the queue.

volatile long ingress;
static final AtomicLongFieldUpdater INGRESS;
volatile long egress;
static final AtomicL ongFieldUpdater EGRESS;
final Queue queue;
// --------
long start = INGRESS.getAndIncrement();
if (start >= 0) queue.offer(value);
EGRESS.lazySet(start);
// --------
long old = INGRESS.getAndSet(Long.MIN_VALUE);
if (old < 0) return;
while (egress != old) Thread.yield();
queue.clear();
pool.putBack(queue);

@akarnokd
Copy link
Member

I see a couple of problems in 1717:

  • it doesn't seem to cleanup after the latch times out or succeeds by unsubscribing, letting the source tick until the end of the app.
  • there is a race between the incrementandget and the get test; if the main thread wakes up too late, the 1ms source might call doOnNext a couple of times more and thus the count check fails.

As for the retry, Mockito's InOrder construct is O(n^2) which makes the verification slow. It is possible the test core run 200 times just runs out of time.

@benjchristensen
Copy link
Member Author

A possible solution is to use a WriterReaderPhaser to mutually exclude the enqueue and unsubscribe calls

While thinking about this approach it seemed to be exactly the same as the WIP approach I tried, and after implementing and testing it, that is exactly how it behaves. It is effectively the same thing, just instead of 1 counter it has 2 that get touched in the line of emission (in the try/finally).

To explore this further I studied the WriterReaderPhaser uses by Gil Tene at:

Here are the performance numbers, the phaser being the last column:

Benchmark                                          (size)   Mode   Samples          1.x   Inline Volatile    + finalize       RW Lock          WIP     WRPhaser
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4757888.048       5264594.520   4956256.323   5288310.755  5032942.628  5147447.030
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    44877.618         42845.758     39209.439     25742.696    29025.955    27779.876
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       42.366            40.979        37.036        24.769       27.260       27.694
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99981.127         99910.070     94307.080    103112.286   103176.289   100516.101
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.675             4.620         4.670         4.374        4.313        4.413
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4751265.583       4706441.452   4376983.062   4739418.557  4673633.614  4510099.724
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458704.984        480075.261    427165.143    483313.588   476318.407   462373.555
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    42493.290         42178.254     39640.240     42728.480    42542.171    41354.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5406448.872       5375090.752   5066264.570   5628401.294  4974892.417  4986054.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.528            40.990        41.106        24.974       28.212       27.755
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76898.222         72655.377     69748.305     78283.565    78987.646    78550.912
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3172.653          2955.854      3064.749      1858.361     2204.948     2310.804
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5157134.576       5163837.644   4846336.744   5290961.536  5139893.848  4486879.415
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39961.491         39341.526     37312.117     40418.492    39163.267    37424.146
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.925            35.730        33.948        35.526       35.611       32.287

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants