Skip to content

2.x: fix GroupBy MissingBackpressureException due to main/group overflow #3426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 164 additions & 38 deletions src/main/java/io/reactivex/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.observables.GroupedObservable;
import io.reactivex.plugins.RxJavaPlugins;

public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservable<K, V>, T>{
final Function<? super T, ? extends K> keySelector;
Expand All @@ -44,7 +45,9 @@ public Subscriber<? super T> apply(Subscriber<? super GroupedObservable<K, V>> t
return new GroupBySubscriber<>(t, keySelector, valueSelector, bufferSize, delayError);
}

public static final class GroupBySubscriber<T, K, V> extends AtomicInteger implements Subscriber<T>, Subscription {
public static final class GroupBySubscriber<T, K, V>
extends AtomicInteger
implements Subscriber<T>, Subscription {
/** */
private static final long serialVersionUID = -3688291656102519502L;

Expand All @@ -54,6 +57,7 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
final Queue<GroupedObservable<K, V>> queue;

static final Object NULL_KEY = new Object();

Expand All @@ -64,14 +68,28 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
static final AtomicIntegerFieldUpdater<GroupBySubscriber> CANCELLED =
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "cancelled");

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<GroupBySubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "requested");

volatile int groupCount;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> GROUP_COUNT =
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "groupCount");

Throwable error;
volatile boolean done;

public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
this.actual = actual;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.groups = new ConcurrentHashMap<>();
this.lazySet(1);
this.queue = new SpscLinkedArrayQueue<>(bufferSize);
GROUP_COUNT.lazySet(this, 1);
}

@Override
Expand All @@ -82,16 +100,24 @@ public void onSubscribe(Subscription s) {

this.s = s;
actual.onSubscribe(this);
s.request(bufferSize);
}

@Override
public void onNext(T t) {
if (done) {
return;
}

final Queue<GroupedObservable<K, V>> q = this.queue;
final Subscriber<? super GroupedObservable<K, V>> a = this.actual;

K key;
try {
key = keySelector.apply(t);
} catch (Throwable e) {
} catch (Throwable ex) {
s.cancel();
onError(e);
errorAll(a, q, ex);
return;
}

Expand All @@ -101,71 +127,74 @@ public void onNext(T t) {
if (group == null) {
// if the main has been cancelled, stop creating groups
// and skip this value
if (cancelled != 0) {
s.request(1);
if (cancelled == 0) {
group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);

GROUP_COUNT.getAndIncrement(this);

notNew = false;
q.offer(group);
drain();
} else {
return;
}
notNew = true;

group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);

getAndIncrement();

actual.onNext(group);
}

V v;
try {
v = valueSelector.apply(t);
} catch (Throwable e) {
} catch (Throwable ex) {
s.cancel();
onError(e);
errorAll(a, q, ex);
return;
}

group.onNext(v);

if (notNew) {
s.request(1); // we spent this t on an existing group, request one more
s.request(1);
}
}

@Override
public void onError(Throwable t) {
List<GroupedUnicast<K, V>> list = new ArrayList<>(groups.values());
groups.clear();

for (GroupedUnicast<K, V> e : list) {
e.onError(t);
if (done) {
RxJavaPlugins.onError(t);
return;
}

actual.onError(t);
error = t;
done = true;
GROUP_COUNT.decrementAndGet(this);
drain();
}

@Override
public void onComplete() {
List<GroupedUnicast<K, V>> list = new ArrayList<>(groups.values());
groups.clear();

for (GroupedUnicast<K, V> e : list) {
e.onComplete();
if (done) {
return;
}

actual.onComplete();
done = true;
GROUP_COUNT.decrementAndGet(this);
drain();
}

@Override
public void request(long n) {
s.request(n);
if (SubscriptionHelper.validateRequest(n)) {
return;
}

BackpressureHelper.add(REQUESTED, this, n);
drain();
}

@Override
public void cancel() {
// cancelling the main source means we don't want any more groups
// but running groups still require new values
if (CANCELLED.compareAndSet(this, 0, 1)) {
if (decrementAndGet() == 0) {
if (GROUP_COUNT.decrementAndGet(this) == 0) {
s.cancel();
}
}
Expand All @@ -174,10 +203,100 @@ public void cancel() {
public void cancel(K key) {
Object mapKey = key != null ? key : NULL_KEY;
groups.remove(mapKey);
if (decrementAndGet() == 0) {
if (GROUP_COUNT.decrementAndGet(this) == 0) {
s.cancel();
}
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

int missed = 1;

final Queue<GroupedObservable<K, V>> q = this.queue;
final Subscriber<? super GroupedObservable<K, V>> a = this.actual;

for (;;) {

if (checkTerminated(done, q.isEmpty(), a, q)) {
return;
}

long r = requested;
boolean unbounded = r == Long.MAX_VALUE;
long e = 0L;

while (r != 0) {
boolean d = done;

GroupedObservable<K, V> t = q.poll();

boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
return;
}

if (empty) {
break;
}

a.onNext(t);

r--;
e--;
}

if (e != 0L) {
if (!unbounded) {
REQUESTED.addAndGet(this, e);
}
s.request(-e);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwable ex) {
q.clear();
List<GroupedUnicast<K, V>> list = new ArrayList<>(groups.values());
groups.clear();

for (GroupedUnicast<K, V> e : list) {
e.onError(ex);
}

a.onError(ex);
}

boolean checkTerminated(boolean d, boolean empty,
Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q) {
if (d) {
Throwable err = error;
if (err != null) {
errorAll(a, q, err);
return true;
} else
if (empty) {
List<GroupedUnicast<K, V>> list = new ArrayList<>(groups.values());
groups.clear();

for (GroupedUnicast<K, V> e : list) {
e.onComplete();
}

actual.onComplete();
return true;
}
}
return false;
}
}

static final class GroupedUnicast<K, T> extends GroupedObservable<K, T> {
Expand Down Expand Up @@ -233,7 +352,12 @@ static final class State<T, K> extends AtomicInteger implements Subscription, Pu
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Subscriber> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(State.class, Subscriber.class, "actual");


volatile int once;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> ONCE =
AtomicIntegerFieldUpdater.newUpdater(State.class, "once");

public State(int bufferSize, GroupBySubscriber<?, K, T> parent, K key, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<>(bufferSize);
this.parent = parent;
Expand All @@ -247,7 +371,6 @@ public void request(long n) {
return;
}
BackpressureHelper.add(REQUESTED, this, n);
parent.request(n);
drain();
}

Expand All @@ -262,8 +385,10 @@ public void cancel() {

@Override
public void subscribe(Subscriber<? super T> s) {
if (ACTUAL.compareAndSet(this, null, s)) {
if (ONCE.compareAndSet(this, 0, 1)) {
s.onSubscribe(this);
ACTUAL.lazySet(this, s);
drain();
} else {
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), s);
}
Expand Down Expand Up @@ -332,6 +457,7 @@ void drain() {
if (!unbounded) {
REQUESTED.addAndGet(this, e);
}
parent.s.request(-e);
}
}

Expand Down
Loading