Skip to content

Fix the performance degradation due to different schedule execution and #2912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction s = poolWorker.scheduleActual(action, 0, null);

serial.add(s);
s.addParent(serial);
ScheduledAction s = poolWorker.scheduleActual(action, 0, null, serial);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly this is just removing work that gets done for us within scheduleActual. Am I reading this correctly? Referenced code is here:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here was that serial.add() happened after the action was scheduled and executed thus if there was some recursive work, those serial.add() calls could happen before this initial call and thus the order in the list was reversed. Since the removal is O(n) and we expected the list to be in the same order as the executions finish, any disorder increases the remove time and a concurrent lock attempt run out of its spin time and ended up parking. The performance degradation was this increased park/unpark activity. The scheduleAction overload just adds the action to the serial before it schedules it thus making sure any recursive schedule happens after it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that's an interesting issue. Thanks for the explanation!


return s;
}
Expand Down
33 changes: 6 additions & 27 deletions src/main/java/rx/internal/util/SubscriptionList.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
*/
package rx.internal.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;

import rx.Subscription;
import rx.exceptions.Exceptions;
Expand All @@ -34,7 +29,6 @@ public final class SubscriptionList implements Subscription {

private LinkedList<Subscription> subscriptions;
private volatile boolean unsubscribed;
private final ReentrantLock lock = new ReentrantLock();

public SubscriptionList() {
}
Expand Down Expand Up @@ -66,8 +60,7 @@ public void add(final Subscription s) {
return;
}
if (!unsubscribed) {
lock.lock();
try {
synchronized (this) {
if (!unsubscribed) {
LinkedList<Subscription> subs = subscriptions;
if (subs == null) {
Expand All @@ -77,8 +70,6 @@ public void add(final Subscription s) {
subs.add(s);
return;
}
} finally {
lock.unlock();
}
}
// call after leaving the synchronized block so we're not holding a lock while executing this
Expand All @@ -88,15 +79,12 @@ public void add(final Subscription s) {
public void remove(final Subscription s) {
if (!unsubscribed) {
boolean unsubscribe = false;
lock.lock();
try {
synchronized (this) {
LinkedList<Subscription> subs = subscriptions;
if (unsubscribed || subs == null) {
return;
}
unsubscribe = subs.remove(s);
} finally {
lock.unlock();
}
if (unsubscribe) {
// if we removed successfully we then need to call unsubscribe on it (outside of the lock)
Expand All @@ -113,16 +101,13 @@ public void remove(final Subscription s) {
public void unsubscribe() {
if (!unsubscribed) {
List<Subscription> list;
lock.lock();
try {
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
} finally {
lock.unlock();
}
// we will only get here once
unsubscribeFromAll(list);
Expand Down Expand Up @@ -150,12 +135,9 @@ private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
public void clear() {
if (!unsubscribed) {
List<Subscription> list;
lock.lock();
try {
synchronized (this) {
list = subscriptions;
subscriptions = null;
} finally {
lock.unlock();
}
unsubscribeFromAll(list);
}
Expand All @@ -166,11 +148,8 @@ public void clear() {
*/
public boolean hasSubscriptions() {
if (!unsubscribed) {
lock.lock();
try {
synchronized (this) {
return !unsubscribed && subscriptions != null && !subscriptions.isEmpty();
} finally {
lock.unlock();
}
}
return false;
Expand Down