Skip to content

Commit f6cc95b

Browse files
committed
ReactiveX#245 Fix CircuitBreakerSubscriber for Reactor doesn't count successes when using Mono/Flux.toFuture()
Circuit breaker on Mono will count success onNext or onComplete.
1 parent a9abce7 commit f6cc95b

File tree

5 files changed

+51
-5
lines changed

5 files changed

+51
-5
lines changed

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.reactivestreams.Subscriber;
2323
import reactor.core.CoreSubscriber;
2424

25+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26+
2527
import static java.util.Objects.requireNonNull;
2628

2729
/**
@@ -33,23 +35,39 @@ class CircuitBreakerSubscriber<T> extends ResilienceBaseSubscriber<T> {
3335

3436
private final CircuitBreaker circuitBreaker;
3537
private StopWatch stopWatch;
38+
private final boolean singleProducer;
39+
40+
private volatile int successSignaled = 0;
41+
private static final AtomicIntegerFieldUpdater<CircuitBreakerSubscriber> SUCCESS_SIGNALED =
42+
AtomicIntegerFieldUpdater.newUpdater(CircuitBreakerSubscriber.class, "successSignaled");
3643

3744
public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
38-
CoreSubscriber<? super T> actual) {
45+
CoreSubscriber<? super T> actual,
46+
boolean singleProducer) {
3947
super(actual);
4048
this.circuitBreaker = requireNonNull(circuitBreaker);
49+
this.singleProducer = singleProducer;
4150
}
4251

4352
@Override
4453
protected void hookOnNext(T value) {
54+
if (singleProducer) {
55+
if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
56+
markSuccess();
57+
}
58+
}
59+
4560
if (notCancelled() && wasCallPermitted()) {
4661
actual.onNext(value);
4762
}
4863
}
4964

5065
@Override
5166
protected void hookOnComplete() {
52-
markSuccess();
67+
if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
68+
markSuccess();
69+
}
70+
5371
if (wasCallPermitted()) {
5472
actual.onComplete();
5573
}

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreaker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public FluxCircuitBreaker(Flux<? extends T> source, CircuitBreaker circuitBreake
3131

3232
@Override
3333
public void subscribe(CoreSubscriber<? super T> actual) {
34-
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual));
34+
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, false));
3535
}
3636

3737
}

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreaker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@ public MonoCircuitBreaker(Mono<? extends T> source, CircuitBreaker circuitBreake
3030

3131
@Override
3232
public void subscribe(CoreSubscriber<? super T> actual) {
33-
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual));
33+
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, true));
3434
}
3535
}

resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public CircuitBreakerSubscriberWhiteboxVerification() {
3131

3232
@Override
3333
public Subscriber<Integer> createSubscriber(WhiteboxSubscriberProbe<Integer> probe) {
34-
return new CircuitBreakerSubscriber<Integer>(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create()) {
34+
return new CircuitBreakerSubscriber<Integer>(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create(), false) {
3535

3636
@Override
3737
protected void hookOnSubscribe(Subscription subscription) {

resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222

2323
import java.io.IOException;
2424
import java.time.Duration;
25+
import java.util.concurrent.ExecutionException;
26+
27+
import static org.junit.Assert.fail;
2528

2629
public class MonoCircuitBreakerTest extends CircuitBreakerAssertions {
2730

@@ -36,6 +39,16 @@ public void shouldEmitEvent() {
3639
assertSingleSuccessfulCall();
3740
}
3841

42+
@Test
43+
public void shouldEmptyMonoShouldBeSuccessful() {
44+
StepVerifier.create(
45+
Mono.empty()
46+
.transform(CircuitBreakerOperator.of(circuitBreaker)))
47+
.verifyComplete();
48+
49+
assertSingleSuccessfulCall();
50+
}
51+
3952
@Test
4053
public void shouldPropagateError() {
4154
StepVerifier.create(
@@ -82,4 +95,19 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() {
8295

8396
assertNoRegisteredCall();
8497
}
98+
99+
@Test
100+
public void shouldRecordSuccessWhenUsingToFuture() {
101+
try {
102+
Mono.just("Event")
103+
.transform(CircuitBreakerOperator.of(circuitBreaker))
104+
.toFuture()
105+
.get();
106+
107+
assertSingleSuccessfulCall();
108+
} catch (InterruptedException | ExecutionException e) {
109+
fail();
110+
}
111+
112+
}
85113
}

0 commit comments

Comments
 (0)