Skip to content

Commit 43d4d0f

Browse files
authored
fixes RequestOperator to subscribe to the source at later phase (#963)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent a7fb551 commit 43d4d0f

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ abstract class RequestOperator
2323
Fuseable.QueueSubscription<Payload>,
2424
Fuseable {
2525

26+
final CorePublisher<Payload> source;
2627
final String errorMessageOnSecondSubscription;
2728

2829
CoreSubscriber<? super Payload> actual;
@@ -38,8 +39,8 @@ abstract class RequestOperator
3839
AtomicIntegerFieldUpdater.newUpdater(RequestOperator.class, "wip");
3940

4041
RequestOperator(CorePublisher<Payload> source, String errorMessageOnSecondSubscription) {
42+
this.source = source;
4143
this.errorMessageOnSecondSubscription = errorMessageOnSecondSubscription;
42-
source.subscribe(this);
4344
WIP.lazySet(this, -1);
4445
}
4546

@@ -52,6 +53,7 @@ public void subscribe(Subscriber<? super Payload> actual) {
5253
public void subscribe(CoreSubscriber<? super Payload> actual) {
5354
if (this.wip == -1 && WIP.compareAndSet(this, -1, 0)) {
5455
this.actual = actual;
56+
source.subscribe(this);
5557
actual.onSubscribe(this);
5658
} else {
5759
Operators.error(actual, new IllegalStateException(this.errorMessageOnSecondSubscription));

0 commit comments

Comments
 (0)