Skip to content

Commit d2b8f2a

Browse files
committed
fixes onDiscard leak in UnicastMonoProcessor
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent d6618e3 commit d2b8f2a

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public static <T> UnicastMonoProcessor<T> create(MonoLifecycleHandler<T> lifecyc
9999
UnicastMonoProcessor.class, Subscription.class, "subscription");
100100

101101
CoreSubscriber<? super O> actual;
102+
boolean hasDownstream = false;
102103

103104
Throwable error;
104105
O value;
@@ -185,7 +186,7 @@ private void complete(O v) {
185186
if (state == HAS_REQUEST_NO_RESULT) {
186187
if (STATE.compareAndSet(this, HAS_REQUEST_NO_RESULT, HAS_REQUEST_HAS_RESULT)) {
187188
final Subscriber<? super O> a = actual;
188-
actual = null;
189+
hasDownstream = false;
189190
value = null;
190191
lifecycleHandler.doOnTerminal(SignalType.ON_COMPLETE, v, null);
191192
a.onNext(v);
@@ -222,7 +223,7 @@ private void complete() {
222223
if (state == HAS_REQUEST_NO_RESULT || state == NO_REQUEST_NO_RESULT) {
223224
if (STATE.compareAndSet(this, state, HAS_REQUEST_HAS_RESULT)) {
224225
final Subscriber<? super O> a = actual;
225-
actual = null;
226+
hasDownstream = false;
226227
lifecycleHandler.doOnTerminal(SignalType.ON_COMPLETE, null, null);
227228
a.onComplete();
228229
return;
@@ -256,7 +257,7 @@ private void complete(Throwable e) {
256257
if (state == HAS_REQUEST_NO_RESULT || state == NO_REQUEST_NO_RESULT) {
257258
if (STATE.compareAndSet(this, state, HAS_REQUEST_HAS_RESULT)) {
258259
final Subscriber<? super O> a = actual;
259-
actual = null;
260+
hasDownstream = false;
260261
lifecycleHandler.doOnTerminal(SignalType.ON_ERROR, null, e);
261262
a.onError(e);
262263
return;
@@ -278,6 +279,7 @@ public void subscribe(CoreSubscriber<? super O> actual) {
278279

279280
lh.doOnSubscribe();
280281

282+
this.hasDownstream = true;
281283
this.actual = actual;
282284

283285
int state = this.state;
@@ -303,7 +305,7 @@ public void subscribe(CoreSubscriber<? super O> actual) {
303305
// no value
304306
// e.g. [onError / onComplete / dispose] only
305307
if (state == NO_REQUEST_HAS_RESULT && this.value == null) {
306-
this.actual = null;
308+
this.hasDownstream = false;
307309
Throwable e = this.error;
308310
// barrier to flush changes
309311
STATE.set(this, HAS_REQUEST_HAS_RESULT);
@@ -340,7 +342,7 @@ public final void request(long n) {
340342
if (STATE.compareAndSet(this, NO_REQUEST_HAS_RESULT, HAS_REQUEST_HAS_RESULT)) {
341343
final Subscriber<? super O> a = actual;
342344
final O v = value;
343-
actual = null;
345+
hasDownstream = false;
344346
value = null;
345347
lifecycleHandler.doOnTerminal(SignalType.ON_COMPLETE, v, null);
346348
a.onNext(v);
@@ -360,7 +362,7 @@ public final void cancel() {
360362
if (STATE.getAndSet(this, CANCELLED) <= HAS_REQUEST_NO_RESULT) {
361363
Operators.onDiscard(value, currentContext());
362364
value = null;
363-
actual = null;
365+
hasDownstream = false;
364366
lifecycleHandler.doOnTerminal(SignalType.CANCEL, null, null);
365367
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
366368
if (s != null && s != Operators.cancelledSubscription()) {
@@ -502,6 +504,6 @@ public Object scanUnsafe(Attr key) {
502504
* @return true if any {@link Subscriber} is actively subscribed
503505
*/
504506
public final boolean hasDownstream() {
505-
return state > NO_SUBSCRIBER_HAS_RESULT && actual != null;
507+
return state > NO_SUBSCRIBER_HAS_RESULT && hasDownstream;
506508
}
507509
}

0 commit comments

Comments
 (0)