Skip to content

Commit f202a2a

Browse files
committed
fixes LoadBalancedRSocketMono to propagate context
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent e4b3bb9 commit f202a2a

File tree

1 file changed

+48
-30
lines changed

1 file changed

+48
-30
lines changed

rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@
3434
import java.util.concurrent.atomic.AtomicBoolean;
3535
import java.util.concurrent.atomic.AtomicLong;
3636
import org.reactivestreams.Publisher;
37-
import org.reactivestreams.Subscriber;
3837
import org.reactivestreams.Subscription;
3938
import org.slf4j.Logger;
4039
import org.slf4j.LoggerFactory;
4140
import reactor.core.CoreSubscriber;
4241
import reactor.core.publisher.Flux;
4342
import reactor.core.publisher.Mono;
4443
import reactor.core.publisher.MonoProcessor;
44+
import reactor.core.publisher.Operators;
45+
import reactor.util.context.Context;
4546
import reactor.util.retry.Retry;
4647

4748
/**
@@ -667,26 +668,28 @@ private class WeightedSocket implements LoadBalancerSocketMetrics, RSocket {
667668
@Override
668669
public Mono<Payload> requestResponse(Payload payload) {
669670
return rSocketMono.flatMap(
670-
source -> {
671-
return Mono.from(
672-
subscriber ->
673-
source
674-
.requestResponse(payload)
675-
.subscribe(new LatencySubscriber<>(subscriber, this)));
676-
});
671+
source ->
672+
Mono.from(
673+
subscriber ->
674+
source
675+
.requestResponse(payload)
676+
.subscribe(
677+
new LatencySubscriber<>(
678+
Operators.toCoreSubscriber(subscriber), this))));
677679
}
678680

679681
@Override
680682
public Flux<Payload> requestStream(Payload payload) {
681683

682684
return rSocketMono.flatMapMany(
683-
source -> {
684-
return Flux.from(
685-
subscriber ->
686-
source
687-
.requestStream(payload)
688-
.subscribe(new CountingSubscriber<>(subscriber, this)));
689-
});
685+
source ->
686+
Flux.from(
687+
subscriber ->
688+
source
689+
.requestStream(payload)
690+
.subscribe(
691+
new CountingSubscriber<>(
692+
Operators.toCoreSubscriber(subscriber), this))));
690693
}
691694

692695
@Override
@@ -698,7 +701,9 @@ public Mono<Void> fireAndForget(Payload payload) {
698701
subscriber ->
699702
source
700703
.fireAndForget(payload)
701-
.subscribe(new CountingSubscriber<>(subscriber, this)));
704+
.subscribe(
705+
new CountingSubscriber<>(
706+
Operators.toCoreSubscriber(subscriber), this)));
702707
});
703708
}
704709

@@ -710,21 +715,24 @@ public Mono<Void> metadataPush(Payload payload) {
710715
subscriber ->
711716
source
712717
.metadataPush(payload)
713-
.subscribe(new CountingSubscriber<>(subscriber, this)));
718+
.subscribe(
719+
new CountingSubscriber<>(
720+
Operators.toCoreSubscriber(subscriber), this)));
714721
});
715722
}
716723

717724
@Override
718725
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
719726

720727
return rSocketMono.flatMapMany(
721-
source -> {
722-
return Flux.from(
723-
subscriber ->
724-
source
725-
.requestChannel(payloads)
726-
.subscribe(new CountingSubscriber<>(subscriber, this)));
727-
});
728+
source ->
729+
Flux.from(
730+
subscriber ->
731+
source
732+
.requestChannel(payloads)
733+
.subscribe(
734+
new CountingSubscriber<>(
735+
Operators.toCoreSubscriber(subscriber), this))));
728736
}
729737

730738
synchronized double getPredictedLatency() {
@@ -867,18 +875,23 @@ public long lastTimeUsedMillis() {
867875
* Subscriber wrapper used for request/response interaction model, measure and collect latency
868876
* information.
869877
*/
870-
private class LatencySubscriber<U> implements Subscriber<U> {
871-
private final Subscriber<U> child;
878+
private class LatencySubscriber<U> implements CoreSubscriber<U> {
879+
private final CoreSubscriber<U> child;
872880
private final WeightedSocket socket;
873881
private final AtomicBoolean done;
874882
private long start;
875883

876-
LatencySubscriber(Subscriber<U> child, WeightedSocket socket) {
884+
LatencySubscriber(CoreSubscriber<U> child, WeightedSocket socket) {
877885
this.child = child;
878886
this.socket = socket;
879887
this.done = new AtomicBoolean(false);
880888
}
881889

890+
@Override
891+
public Context currentContext() {
892+
return child.currentContext();
893+
}
894+
882895
@Override
883896
public void onSubscribe(Subscription s) {
884897
start = incr();
@@ -931,15 +944,20 @@ public void onComplete() {
931944
* Subscriber wrapper used for stream like interaction model, it only counts the number of
932945
* active streams
933946
*/
934-
private class CountingSubscriber<U> implements Subscriber<U> {
935-
private final Subscriber<U> child;
947+
private class CountingSubscriber<U> implements CoreSubscriber<U> {
948+
private final CoreSubscriber<U> child;
936949
private final WeightedSocket socket;
937950

938-
CountingSubscriber(Subscriber<U> child, WeightedSocket socket) {
951+
CountingSubscriber(CoreSubscriber<U> child, WeightedSocket socket) {
939952
this.child = child;
940953
this.socket = socket;
941954
}
942955

956+
@Override
957+
public Context currentContext() {
958+
return child.currentContext();
959+
}
960+
943961
@Override
944962
public void onSubscribe(Subscription s) {
945963
socket.pendingStreams.incrementAndGet();

0 commit comments

Comments
 (0)